1 package de.dlr.bt.stc.source.mqtt; 2 3 import java.time.ZonedDateTime; 4 import java.util.Collections; 5 import java.util.Optional; 6 7 import org.eclipse.paho.client.mqttv3.IMqttClient; 8 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 9 import org.eclipse.paho.client.mqttv3.IMqttMessageListener; 10 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; 11 import org.eclipse.paho.client.mqttv3.MqttException; 12 import org.eclipse.paho.client.mqttv3.MqttMessage; 13 import org.greenrobot.eventbus.EventBus; 14 15 import de.dlr.bt.stc.eventbus.DataAvailableEvent; 16 import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType; 17 import lombok.AllArgsConstructor; 18 import lombok.Builder; 19 import lombok.Data; 20 import lombok.extern.slf4j.Slf4j; 21 22 @Slf4j 23 @AllArgsConstructor 24 @Builder 25 @Data 26 public class MqttSubscriber implements IMqttMessageListener, MqttCallbackExtended { 27 28 private final IMqttClient client; 29 private final EventBus eventBus; 30 private final String sourceKey; 31 private final int qos; 32 private final String topic; 33 private final SourceDataType dataType; 34 35 public void start() throws MqttException { 36 client.subscribe(topic, qos, this); 37 client.setCallback(this); 38 } 39 40 @Override 41 public void messageArrived(String topic, MqttMessage message) throws MqttException { 42 log.debug("MqttSubscriber: message arrived for topic {} and sourceKey {}", topic, sourceKey); 43 44 Optional.ofNullable(message.getPayload()) 45 .map(payLoad -> MqttDataConverter.convertPayload(payLoad, dataType)) 46 .map(this::createEvent) 47 .ifPresent(eventBus::post); 48 } 49 50 private DataAvailableEvent createEvent(Object payObj) { 51 long timestamp = ZonedDateTime.now().toInstant().toEpochMilli(); 52 return new DataAvailableEvent(sourceKey, payObj, timestamp, Collections.emptyMap()); 53 } 54 55 @Override 56 public void connectionLost(Throwable cause) { 57 log.warn("MQTT Source {}: connection lost", sourceKey, cause); 58 } 59 60 @Override 61 public void deliveryComplete(IMqttDeliveryToken token) { 62 log.debug("MQTT Source {}: delivery complete", sourceKey); 63 } 64 65 @Override 66 public void connectComplete(boolean reconnect, String serverURI) { 67 if (!reconnect) { 68 log.info("MQTT Source {}: connecting complete to {}", sourceKey, serverURI); 69 return; 70 } 71 try { 72 log.info("MQTT Source {}: reconnecting to {}", sourceKey, serverURI); 73 start(); 74 } catch (MqttException e) { 75 log.error("MQTT Source {}: problems reconnecting to {}", sourceKey, e, serverURI); 76 } 77 } 78 79 public void close() throws MqttException { 80 client.disconnect(); 81 client.close(); 82 } 83 }