View Javadoc
1   package de.dlr.bt.stc.source.mqtt;
2   
3   import java.time.ZonedDateTime;
4   import java.util.Collections;
5   import java.util.Optional;
6   
7   import org.eclipse.paho.client.mqttv3.IMqttClient;
8   import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
9   import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
10  import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
11  import org.eclipse.paho.client.mqttv3.MqttException;
12  import org.eclipse.paho.client.mqttv3.MqttMessage;
13  import org.greenrobot.eventbus.EventBus;
14  
15  import de.dlr.bt.stc.eventbus.DataAvailableEvent;
16  import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType;
17  import lombok.AllArgsConstructor;
18  import lombok.Builder;
19  import lombok.Data;
20  import lombok.extern.slf4j.Slf4j;
21  
22  @Slf4j
23  @AllArgsConstructor
24  @Builder
25  @Data
26  public class MqttSubscriber implements IMqttMessageListener, MqttCallbackExtended {
27  
28    private final IMqttClient client;
29    private final EventBus eventBus;
30    private final String sourceKey;
31    private final int qos;
32    private final String topic;
33    private final SourceDataType dataType;
34  
35    public void start() throws MqttException {
36      client.subscribe(topic, qos, this);
37      client.setCallback(this);
38    }
39  
40    @Override
41    public void messageArrived(String topic, MqttMessage message) throws MqttException {
42      log.debug("MqttSubscriber: message arrived for topic {} and sourceKey {}", topic, sourceKey);
43  
44      Optional.ofNullable(message.getPayload())
45          .map(payLoad -> MqttDataConverter.convertPayload(payLoad, dataType))
46          .map(this::createEvent)
47          .ifPresent(eventBus::post);
48    }
49  
50    private DataAvailableEvent createEvent(Object payObj) {
51      long timestamp = ZonedDateTime.now().toInstant().toEpochMilli();
52      return new DataAvailableEvent(sourceKey, payObj, timestamp, Collections.emptyMap());
53    }
54  
55    @Override
56    public void connectionLost(Throwable cause) {
57      log.warn("MQTT Source {}: connection lost", sourceKey, cause);
58    }
59  
60    @Override
61    public void deliveryComplete(IMqttDeliveryToken token) {
62      log.debug("MQTT Source {}: delivery complete", sourceKey);
63    }
64  
65    @Override
66    public void connectComplete(boolean reconnect, String serverURI) {
67      if (!reconnect) {
68        log.info("MQTT Source {}: connecting complete to {}", sourceKey, serverURI);
69        return;
70      }
71      try {
72        log.info("MQTT Source {}: reconnecting to {}", sourceKey, serverURI);
73        start();
74      } catch (MqttException e) {
75        log.error("MQTT Source {}: problems reconnecting to {}", sourceKey, e, serverURI);
76      }
77    }
78  
79    public void close() throws MqttException {
80      client.disconnect();
81      client.close();
82    }
83  }