MqttFactory.java

package de.dlr.bt.stc.source.mqtt;

import java.util.Optional;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.source.CredentialsCfg;

public class MqttFactory {

  private static MqttFactory instance;

  private MqttFactory() {}

  public static MqttFactory getInstance() {
    if (instance == null) {
      synchronized (MqttFactory.class) {
        if (instance == null) {
          instance = new MqttFactory();
        }
      }
    }
    return instance;
  }

  public MqttSubscriber createMqttSubscriber(MqttClient client, SourceMQTTCfg sourceConfig,
      EventBus eventBus) {
    return MqttSubscriber.builder()
        .client(client)
        .eventBus(eventBus)
        .sourceKey(sourceConfig.getId())
        .topic(sourceConfig.getTopic())
        .qos(sourceConfig.getQos())
        .dataType(sourceConfig.getDatatype())
        .build();
  }

  public MqttClient createMqttClient(String subscriberId, String endpoint) throws MqttException {
    return new MqttClient(endpoint, subscriberId);
  }

  public MqttConnectOptions createMqttClientOptions(SourceMQTTCfg sourceConfig) {
    var options = new MqttConnectOptions();
    options.setAutomaticReconnect(true);
    options.setCleanSession(false);
    options.setConnectionTimeout(30);
    setCredentials(sourceConfig.getCredentials(), options);
    return options;
  }

  private void setCredentials(CredentialsCfg credentials, MqttConnectOptions options) {
    if (credentials == null) {
      return;
    }
    options.setUserName(credentials.getUsername());
    options.setPassword(
        Optional.ofNullable(credentials.getPassword())
            .map(p -> p.toCharArray())
            .orElse(null));
  }
}