MqttSource.java
package de.dlr.bt.stc.source.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.exceptions.STCException;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.source.ISource;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Builder
public class MqttSource implements ISource {
private final SourceMQTTCfg sourceConfig;
private final MqttFactory mqttFactory;
private final EventBus instanceEventBus;
@Getter
private final MqttClient mqttClient;
@Getter
private final String taskId;
private MqttSubscriber mqttSubscriber;
@Override
public void initializeTask() throws SourceConfigurationException {
try {
MqttConfigValidator.validateSourceConfiguration(sourceConfig);
if(!mqttClient.isConnected()){
mqttClient.connect(mqttFactory.createMqttClientOptions(sourceConfig));
}
mqttSubscriber = mqttFactory.createMqttSubscriber(mqttClient, sourceConfig, instanceEventBus);
} catch (MqttException e) {
log.error("MQTT Source {}: Problems creating MQTT Subscriber {}", getTaskId(), e);
throw new SourceConfigurationException(
"MQTT Source {}: Problems creating MQTT Subscriber " + getTaskId(), e);
}
}
@Override
public void startTask() throws STCException {
try {
mqttSubscriber.start();
} catch (MqttException e) {
throw new STCException("Problems connecting to MQTT TOPIC : " + sourceConfig.getTopic(), e);
}
}
@Override
public void stopTask() {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
log.error("MQTT Source {}: Problems disconnecting MQTT Subscriber", getTaskId());
}
}
}