1 package de.dlr.bt.stc.source.mqtt;
2
3 import java.util.Optional;
4
5 import org.eclipse.paho.client.mqttv3.MqttClient;
6 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
7 import org.eclipse.paho.client.mqttv3.MqttException;
8 import org.greenrobot.eventbus.EventBus;
9
10 import de.dlr.bt.stc.source.CredentialsCfg;
11
12 public class MqttFactory {
13
14 private static MqttFactory instance;
15
16 private MqttFactory() {}
17
18 public static MqttFactory getInstance() {
19 if (instance == null) {
20 synchronized (MqttFactory.class) {
21 if (instance == null) {
22 instance = new MqttFactory();
23 }
24 }
25 }
26 return instance;
27 }
28
29 public MqttSubscriber createMqttSubscriber(MqttClient client, SourceMQTTCfg sourceConfig,
30 EventBus eventBus) {
31 return MqttSubscriber.builder()
32 .client(client)
33 .eventBus(eventBus)
34 .sourceKey(sourceConfig.getId())
35 .topic(sourceConfig.getTopic())
36 .qos(sourceConfig.getQos())
37 .dataType(sourceConfig.getDatatype())
38 .build();
39 }
40
41 public MqttClient createMqttClient(String subscriberId, String endpoint) throws MqttException {
42 return new MqttClient(endpoint, subscriberId);
43 }
44
45 public MqttConnectOptions createMqttClientOptions(SourceMQTTCfg sourceConfig) {
46 var options = new MqttConnectOptions();
47 options.setAutomaticReconnect(true);
48 options.setCleanSession(false);
49 options.setConnectionTimeout(30);
50 setCredentials(sourceConfig.getCredentials(), options);
51 return options;
52 }
53
54 private void setCredentials(CredentialsCfg credentials, MqttConnectOptions options) {
55 if (credentials == null) {
56 return;
57 }
58 options.setUserName(credentials.getUsername());
59 options.setPassword(
60 Optional.ofNullable(credentials.getPassword())
61 .map(p -> p.toCharArray())
62 .orElse(null));
63 }
64 }