MqttFactory.java
package de.dlr.bt.stc.source.mqtt;
import java.util.Optional;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.source.CredentialsCfg;
public class MqttFactory {
private static MqttFactory instance;
private MqttFactory() {}
public static MqttFactory getInstance() {
if (instance == null) {
synchronized (MqttFactory.class) {
if (instance == null) {
instance = new MqttFactory();
}
}
}
return instance;
}
public MqttSubscriber createMqttSubscriber(MqttClient client, SourceMQTTCfg sourceConfig,
EventBus eventBus) {
return MqttSubscriber.builder()
.client(client)
.eventBus(eventBus)
.sourceKey(sourceConfig.getId())
.topic(sourceConfig.getTopic())
.qos(sourceConfig.getQos())
.dataType(sourceConfig.getDatatype())
.build();
}
public MqttClient createMqttClient(String subscriberId, String endpoint) throws MqttException {
return new MqttClient(endpoint, subscriberId);
}
public MqttConnectOptions createMqttClientOptions(SourceMQTTCfg sourceConfig) {
var options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setConnectionTimeout(30);
setCredentials(sourceConfig.getCredentials(), options);
return options;
}
private void setCredentials(CredentialsCfg credentials, MqttConnectOptions options) {
if (credentials == null) {
return;
}
options.setUserName(credentials.getUsername());
options.setPassword(
Optional.ofNullable(credentials.getPassword())
.map(p -> p.toCharArray())
.orElse(null));
}
}