1 package de.dlr.bt.stc.source.mqtt; 2 3 import java.util.Optional; 4 5 import org.eclipse.paho.client.mqttv3.MqttClient; 6 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 7 import org.eclipse.paho.client.mqttv3.MqttException; 8 import org.greenrobot.eventbus.EventBus; 9 10 import de.dlr.bt.stc.source.CredentialsCfg; 11 12 public class MqttFactory { 13 14 private static MqttFactory instance; 15 16 private MqttFactory() {} 17 18 public static MqttFactory getInstance() { 19 if (instance == null) { 20 synchronized (MqttFactory.class) { 21 if (instance == null) { 22 instance = new MqttFactory(); 23 } 24 } 25 } 26 return instance; 27 } 28 29 public MqttSubscriber createMqttSubscriber(MqttClient client, SourceMQTTCfg sourceConfig, 30 EventBus eventBus) { 31 return MqttSubscriber.builder() 32 .client(client) 33 .eventBus(eventBus) 34 .sourceKey(sourceConfig.getId()) 35 .topic(sourceConfig.getTopic()) 36 .qos(sourceConfig.getQos()) 37 .dataType(sourceConfig.getDatatype()) 38 .build(); 39 } 40 41 public MqttClient createMqttClient(String subscriberId, String endpoint) throws MqttException { 42 return new MqttClient(endpoint, subscriberId); 43 } 44 45 public MqttConnectOptions createMqttClientOptions(SourceMQTTCfg sourceConfig) { 46 var options = new MqttConnectOptions(); 47 options.setAutomaticReconnect(true); 48 options.setCleanSession(false); 49 options.setConnectionTimeout(30); 50 setCredentials(sourceConfig.getCredentials(), options); 51 return options; 52 } 53 54 private void setCredentials(CredentialsCfg credentials, MqttConnectOptions options) { 55 if (credentials == null) { 56 return; 57 } 58 options.setUserName(credentials.getUsername()); 59 options.setPassword( 60 Optional.ofNullable(credentials.getPassword()) 61 .map(p -> p.toCharArray()) 62 .orElse(null)); 63 } 64 }