MqttSource.java

package de.dlr.bt.stc.source.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.exceptions.STCException;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.source.ISource;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Builder
public class MqttSource implements ISource {

  private final SourceMQTTCfg sourceConfig;
  private final MqttFactory mqttFactory;
  private final EventBus instanceEventBus;
  @Getter
  private final MqttClient mqttClient;
  @Getter
  private final String taskId;

  private MqttSubscriber mqttSubscriber;

  @Override
  public void initializeTask() throws SourceConfigurationException {
    try {
      MqttConfigValidator.validateSourceConfiguration(sourceConfig);
      
      if(!mqttClient.isConnected()){
        mqttClient.connect(mqttFactory.createMqttClientOptions(sourceConfig));
      }
      
      mqttSubscriber = mqttFactory.createMqttSubscriber(mqttClient, sourceConfig, instanceEventBus);
    } catch (MqttException e) {
      log.error("MQTT Source {}: Problems creating MQTT Subscriber {}", getTaskId(), e);
      throw new SourceConfigurationException(
          "MQTT Source {}: Problems creating MQTT Subscriber " + getTaskId(), e);
    }
  }

  @Override
  public void startTask() throws STCException {
    try {
      mqttSubscriber.start();
    } catch (MqttException e) {
      throw new STCException("Problems connecting to MQTT TOPIC : " + sourceConfig.getTopic(), e);
    }
  }

  @Override
  public void stopTask() {
    try {
      mqttClient.disconnect();
      mqttClient.close();
    } catch (MqttException e) {
      log.error("MQTT Source {}: Problems disconnecting MQTT Subscriber", getTaskId());
    }
  }
}