Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | import mqtt, { MqttClient } from 'mqtt'; import { MqttMessageHandler } from '../core/mqttHandler'; import { DataService } from '../core/dataService'; const mqttHandler = new MqttMessageHandler(null); // initially null const mqttService = new DataService(mqttHandler); export class MqttConnection { public client: MqttClient | null = null; private readonly mqttBrokerUrl = process.env.MQTT_BROKER_URL || 'mqtt://localhost:1883'; public connectToBroker(topics: string[]): void { console.log(`[MQTT] Connecting to broker: ${this.mqttBrokerUrl}`); // Ensure the previous client is cleaned up before creating a new one Iif (this.client) { console.warn('[MQTT] Cleaning up previous client before reconnecting'); this.client.end(true); // Forcefully close the previous client this.client = null; } this.client = mqtt.connect(this.mqttBrokerUrl, { reconnectPeriod: 0, keepalive: 60 }); mqttHandler.setClient(this.client); // Set the new client in the handler this.client.on('connect', () => this.onConnect(topics)); this.client.on('message', (topic, message) => this.onMessage(topic, message)); this.client.on('error', (error) => this.onError(topics, error)); this.client.on('close', () => { console.warn('[MQTT] Disconnected. Will attempt reconnect'); this.scheduleReconnect(topics); }); // Start periodic tasks in the MqttService mqttService.startLoggingAirSensingData(); mqttService.startTrilaterationProcessing(); } private onConnect(topics: string[]): void { console.log(`[MQTT] Connected to broker: ${this.mqttBrokerUrl}`); topics.forEach((topic) => { this.client?.subscribe(topic, { qos: 1 }, (err) => { if (err) { console.error(`[MQTT] Subscription error for topic ${topic}:`, err.message); } else { console.log(`[MQTT] Subscribed to: ${topic}`); } }); }); } private onMessage(topic: string, message: Buffer): void { mqttHandler.handleMessage(topic, message.toString()); } private onError(topics: string[], error: Error): void { console.error(`[MQTT] Error: ${error.message}`); this.client?.end(true, () => this.scheduleReconnect(topics)); } private scheduleReconnect(topics: string[]): void { setTimeout(() => this.connectToBroker(topics), 10000); console.log('[MQTT] Reconnect scheduled in 10s'); } public getClient(): MqttClient | null { return this.client; } } |