All files / src/config mqtt.ts

0% Statements 0/40
0% Branches 0/13
0% Functions 0/15
0% Lines 0/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69                                                                                                                                         
import mqtt, { MqttClient } from 'mqtt';
import { MqttMessageHandler } from '../core/mqttHandler';
import { DataService   } from '../core/dataService';
 
const mqttHandler = new MqttMessageHandler(null); // initially null
const mqttService = new DataService(mqttHandler);
 
export class MqttConnection {
  public client: MqttClient | null = null;
  private readonly mqttBrokerUrl = process.env.MQTT_BROKER_URL || 'mqtt://localhost:1883';
 
  public connectToBroker(topics: string[]): void {
    console.log(`[MQTT] Connecting to broker: ${this.mqttBrokerUrl}`);
 
    // Ensure the previous client is cleaned up before creating a new one
    Iif (this.client) {
      console.warn('[MQTT] Cleaning up previous client before reconnecting');
      this.client.end(true); // Forcefully close the previous client
      this.client = null;
    }
 
    this.client = mqtt.connect(this.mqttBrokerUrl, { reconnectPeriod: 0, keepalive: 60 });
 
    mqttHandler.setClient(this.client); // Set the new client in the handler
 
    this.client.on('connect', () => this.onConnect(topics));
    this.client.on('message', (topic, message) => this.onMessage(topic, message));
    this.client.on('error', (error) => this.onError(topics, error));
    this.client.on('close', () => {
      console.warn('[MQTT] Disconnected. Will attempt reconnect');
      this.scheduleReconnect(topics);
    });
 
    // Start periodic tasks in the MqttService
    mqttService.startLoggingAirSensingData();
    mqttService.startTrilaterationProcessing();
  }
 
  private onConnect(topics: string[]): void {
    console.log(`[MQTT] Connected to broker: ${this.mqttBrokerUrl}`);
    topics.forEach((topic) => {
      this.client?.subscribe(topic, { qos: 1 }, (err) => {
        if (err) {
          console.error(`[MQTT] Subscription error for topic ${topic}:`, err.message);
        } else {
          console.log(`[MQTT] Subscribed to: ${topic}`);
        }
      });
    });
  }
 
  private onMessage(topic: string, message: Buffer): void {
    mqttHandler.handleMessage(topic, message.toString());
  }
 
  private onError(topics: string[], error: Error): void {
    console.error(`[MQTT] Error: ${error.message}`);
    this.client?.end(true, () => this.scheduleReconnect(topics));
  }
 
  private scheduleReconnect(topics: string[]): void {
    setTimeout(() => this.connectToBroker(topics), 10000);
    console.log('[MQTT] Reconnect scheduled in 10s');
  }
 
  public getClient(): MqttClient | null {
    return this.client;
  }
}