1// Edge gateway MQTT transport — handles connection, QoS,2// and exponential backoff for unreliable cellular links.3import mqtt, { type MqttClient } from "mqtt";4import { TelemetryEnvelope } from "../schemas/telemetry";5 6const RECONNECT_BASE_MS = 1_000;7const RECONNECT_MAX_MS = 30_000;8 9export interface MqttTransportOptions {10 brokerUrl: string;11 deviceId: string;12 caCert: string;13 clientCert: string;14 clientKey: string;15}16 17export class MqttTransport {18 private client: MqttClient | null = null;19 private attempts = 0;20 21 constructor(private readonly opts: MqttTransportOptions) {}22 23 async connect(): Promise<void> {24 this.client = mqtt.connect(this.opts.brokerUrl, {25 clientId: this.opts.deviceId,26 ca: this.opts.caCert,27 cert: this.opts.clientCert,28 key: this.opts.clientKey,29 keepalive: 30,30 reconnectPeriod: 0, // managed manually31 clean: false,32 });33 34 this.client.on("connect", () => {35 this.attempts = 0;36 console.log("[mqtt] connected to", this.opts.brokerUrl);37 });38 39 this.client.on("close", () => this.scheduleReconnect());40 }41 42 async publishTelemetry(env: TelemetryEnvelope): Promise<void> {43 if (!this.client?.connected) throw new Error("transport not connected");44 const topic = `devices/${this.opts.deviceId}/telemetry`;45 await this.client.publishAsync(topic, JSON.stringify(env), { qos: 1 });46 }47 48 private scheduleReconnect() {49 const delay = Math.min(RECONNECT_BASE_MS * 2 ** this.attempts, RECONNECT_MAX_MS);50 this.attempts += 1;51 setTimeout(() => this.connect(), delay);52 }53}CoDesigneredge-gateway · architecture
drafted ADR-0007 · 12s ago
CoBuilderfirmware/mqtt · esp32-s3
platformio run · 16.4s
Operatorfleet · region-eu · 142 devices
diag on dev_a91k2c0qz7lf · 2m ago