MqttSubscriber.java
package de.dlr.bt.stc.source.mqtt;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Optional;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
@Builder
@Data
public class MqttSubscriber implements IMqttMessageListener, MqttCallbackExtended {
private final IMqttClient client;
private final EventBus eventBus;
private final String sourceKey;
private final int qos;
private final String topic;
private final SourceDataType dataType;
public void start() throws MqttException {
client.subscribe(topic, qos, this);
client.setCallback(this);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
log.debug("MqttSubscriber: message arrived for topic {} and sourceKey {}", topic, sourceKey);
Optional.ofNullable(message.getPayload())
.map(payLoad -> MqttDataConverter.convertPayload(payLoad, dataType))
.map(this::createEvent)
.ifPresent(eventBus::post);
}
private DataAvailableEvent createEvent(Object payObj) {
long timestamp = ZonedDateTime.now().toInstant().toEpochMilli();
return new DataAvailableEvent(sourceKey, payObj, timestamp, Collections.emptyMap());
}
@Override
public void connectionLost(Throwable cause) {
log.warn("MQTT Source {}: connection lost", sourceKey, cause);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.debug("MQTT Source {}: delivery complete", sourceKey);
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (!reconnect) {
log.info("MQTT Source {}: connecting complete to {}", sourceKey, serverURI);
return;
}
try {
log.info("MQTT Source {}: reconnecting to {}", sourceKey, serverURI);
start();
} catch (MqttException e) {
log.error("MQTT Source {}: problems reconnecting to {}", sourceKey, e, serverURI);
}
}
public void close() throws MqttException {
client.disconnect();
client.close();
}
}