1 package de.dlr.bt.stc.source.mqtt;
2
3 import java.time.ZonedDateTime;
4 import java.util.Collections;
5 import java.util.Optional;
6
7 import org.eclipse.paho.client.mqttv3.IMqttClient;
8 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
9 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
10 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
11 import org.eclipse.paho.client.mqttv3.MqttException;
12 import org.eclipse.paho.client.mqttv3.MqttMessage;
13 import org.greenrobot.eventbus.EventBus;
14
15 import de.dlr.bt.stc.eventbus.DataAvailableEvent;
16 import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType;
17 import lombok.AllArgsConstructor;
18 import lombok.Builder;
19 import lombok.Data;
20 import lombok.extern.slf4j.Slf4j;
21
22 @Slf4j
23 @AllArgsConstructor
24 @Builder
25 @Data
26 public class MqttSubscriber implements IMqttMessageListener, MqttCallbackExtended {
27
28 private final IMqttClient client;
29 private final EventBus eventBus;
30 private final String sourceKey;
31 private final int qos;
32 private final String topic;
33 private final SourceDataType dataType;
34
35 public void start() throws MqttException {
36 client.subscribe(topic, qos, this);
37 client.setCallback(this);
38 }
39
40 @Override
41 public void messageArrived(String topic, MqttMessage message) throws MqttException {
42 log.debug("MqttSubscriber: message arrived for topic {} and sourceKey {}", topic, sourceKey);
43
44 Optional.ofNullable(message.getPayload())
45 .map(payLoad -> MqttDataConverter.convertPayload(payLoad, dataType))
46 .map(this::createEvent)
47 .ifPresent(eventBus::post);
48 }
49
50 private DataAvailableEvent createEvent(Object payObj) {
51 long timestamp = ZonedDateTime.now().toInstant().toEpochMilli();
52 return new DataAvailableEvent(sourceKey, payObj, timestamp, Collections.emptyMap());
53 }
54
55 @Override
56 public void connectionLost(Throwable cause) {
57 log.warn("MQTT Source {}: connection lost", sourceKey, cause);
58 }
59
60 @Override
61 public void deliveryComplete(IMqttDeliveryToken token) {
62 log.debug("MQTT Source {}: delivery complete", sourceKey);
63 }
64
65 @Override
66 public void connectComplete(boolean reconnect, String serverURI) {
67 if (!reconnect) {
68 log.info("MQTT Source {}: connecting complete to {}", sourceKey, serverURI);
69 return;
70 }
71 try {
72 log.info("MQTT Source {}: reconnecting to {}", sourceKey, serverURI);
73 start();
74 } catch (MqttException e) {
75 log.error("MQTT Source {}: problems reconnecting to {}", sourceKey, e, serverURI);
76 }
77 }
78
79 public void close() throws MqttException {
80 client.disconnect();
81 client.close();
82 }
83 }