MqttSubscriber.java

package de.dlr.bt.stc.source.mqtt;

import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Optional;

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AllArgsConstructor
@Builder
@Data
public class MqttSubscriber implements IMqttMessageListener, MqttCallbackExtended {

  private final IMqttClient client;
  private final EventBus eventBus;
  private final String sourceKey;
  private final int qos;
  private final String topic;
  private final SourceDataType dataType;

  public void start() throws MqttException {
    client.subscribe(topic, qos, this);
    client.setCallback(this);
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws MqttException {
    log.debug("MqttSubscriber: message arrived for topic {} and sourceKey {}", topic, sourceKey);

    Optional.ofNullable(message.getPayload())
        .map(payLoad -> MqttDataConverter.convertPayload(payLoad, dataType))
        .map(this::createEvent)
        .ifPresent(eventBus::post);
  }

  private DataAvailableEvent createEvent(Object payObj) {
    long timestamp = ZonedDateTime.now().toInstant().toEpochMilli();
    return new DataAvailableEvent(sourceKey, payObj, timestamp, Collections.emptyMap());
  }

  @Override
  public void connectionLost(Throwable cause) {
    log.warn("MQTT Source {}: connection lost", sourceKey, cause);
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
    log.debug("MQTT Source {}: delivery complete", sourceKey);
  }

  @Override
  public void connectComplete(boolean reconnect, String serverURI) {
    if (!reconnect) {
      log.info("MQTT Source {}: connecting complete to {}", sourceKey, serverURI);
      return;
    }
    try {
      log.info("MQTT Source {}: reconnecting to {}", sourceKey, serverURI);
      start();
    } catch (MqttException e) {
      log.error("MQTT Source {}: problems reconnecting to {}", sourceKey, e, serverURI);
    }
  }

  public void close() throws MqttException {
    client.disconnect();
    client.close();
  }
}