1// Edge gateway MQTT transport — handles connection, QoS,
2// and exponential backoff for unreliable cellular links.
3import mqtt, { type MqttClient } from "mqtt";
4import { TelemetryEnvelope } from "../schemas/telemetry";
5 
6const RECONNECT_BASE_MS = 1_000;
7const RECONNECT_MAX_MS = 30_000;
8 
9export interface MqttTransportOptions {
10 brokerUrl: string;
11 deviceId: string;
12 caCert: string;
13 clientCert: string;
14 clientKey: string;
15}
16 
17export class MqttTransport {
18 private client: MqttClient | null = null;
19 private attempts = 0;
20 
21 constructor(private readonly opts: MqttTransportOptions) {}
22 
23 async connect(): Promise<void> {
24 this.client = mqtt.connect(this.opts.brokerUrl, {
25 clientId: this.opts.deviceId,
26 ca: this.opts.caCert,
27 cert: this.opts.clientCert,
28 key: this.opts.clientKey,
29 keepalive: 30,
30 reconnectPeriod: 0, // managed manually
31 clean: false,
32 });
33 
34 this.client.on("connect", () => {
35 this.attempts = 0;
36 console.log("[mqtt] connected to", this.opts.brokerUrl);
37 });
38 
39 this.client.on("close", () => this.scheduleReconnect());
40 }
41 
42 async publishTelemetry(env: TelemetryEnvelope): Promise<void> {
43 if (!this.client?.connected) throw new Error("transport not connected");
44 const topic = `devices/${this.opts.deviceId}/telemetry`;
45 await this.client.publishAsync(topic, JSON.stringify(env), { qos: 1 });
46 }
47 
48 private scheduleReconnect() {
49 const delay = Math.min(RECONNECT_BASE_MS * 2 ** this.attempts, RECONNECT_MAX_MS);
50 this.attempts += 1;
51 setTimeout(() => this.connect(), delay);
52 }
53}

CoDesigneredge-gateway · architecture

drafted ADR-0007 · 12s ago

thinking

CoBuilderfirmware/mqtt · esp32-s3

platformio run · 16.4s

running

Operatorfleet · region-eu · 142 devices

diag on dev_a91k2c0qz7lf · 2m ago

idle