MQTTSourceProvider.java

package de.dlr.bt.stc.source.mqtt;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.config.ACfg;
import de.dlr.bt.stc.config.ConfigurationManager;
import de.dlr.bt.stc.entities.TaskLifecycle;
import de.dlr.bt.stc.init.Register;
import de.dlr.bt.stc.source.ASourceProvider;
import de.dlr.bt.stc.task.TaskProviderFactory;

public class MQTTSourceProvider extends ASourceProvider<MqttSource> {

  private final ConfigurationManager cfg;
  private final EventBus instanceEventBus;
  private MqttFactory mqttFactory;

  @Register
  public static void register() {
    TaskProviderFactory.getInstance().registerCreator(SourceMQTTCfg.class, MQTTSourceProvider::new);
  }

  public MQTTSourceProvider(ConfigurationManager cfg) {
    super(cfg.getManagementEventBus());
    this.cfg = cfg;
    this.instanceEventBus = cfg.getInstanceEventBus();
    this.mqttFactory = MqttFactory.getInstance();
  }

  public MQTTSourceProvider(ConfigurationManager cfg, MqttFactory mqttFactory) {
    this(cfg);
    this.mqttFactory = mqttFactory;
  }

  @Override
  public boolean initializeTasks() {
    createSourcesForMqttConfigEntries()
        .forEach(
            mqttSource -> taskLifecycles.put(mqttSource.getTaskId(),
                new TaskLifecycle<>(mqttSource)));
    return super.initializeTasks();
  }

  private List<MqttSource> createSourcesForMqttConfigEntries() {
    List<SourceMQTTCfg> allMqttConfigs = findAllMqttConfigs(cfg.getConfigurations());
    
    Map<String, MqttClient> endpointToClient = createSharedClientsBasedOnEndpoints(allMqttConfigs);
   
    return allMqttConfigs
        .stream()
        .map(conf -> createMqttSource(conf, endpointToClient))
        .toList();
  }

  private Map<String, MqttClient> createSharedClientsBasedOnEndpoints(
      List<SourceMQTTCfg> allMqttConfigs) {
    return allMqttConfigs
      .stream()
      .map(SourceMQTTCfg::getEndpoint)
      .distinct()
      .collect(Collectors.toMap(endp -> endp, this::createClient));
  }

  private MqttClient createClient(String endpoint) {
    try {
      var subscriberId = "shepardTimeseriesCollector-" + UUID.randomUUID();
      return mqttFactory.createMqttClient(subscriberId, endpoint);
    } catch (MqttException e) {
      throw new RuntimeException("Problems creating mqtt client client for Endpoint " + endpoint, e);
    }    
  }
  
  private List<SourceMQTTCfg> findAllMqttConfigs(Map<String, ACfg> configurations) {
    return configurations.entrySet()
        .stream()
        .map(Entry::getValue)
        .filter(config -> config instanceof SourceMQTTCfg)
        .map(conf -> (SourceMQTTCfg) conf)
        .toList();
  }

  private MqttSource createMqttSource(SourceMQTTCfg config, Map<String, MqttClient> endpointToClient) {
    return MqttSource.builder()
        .taskId(config.getId())
        .mqttClient(endpointToClient.get(config.getEndpoint()))
        .mqttFactory(mqttFactory)
        .sourceConfig((SourceMQTTCfg) config)
        .instanceEventBus(instanceEventBus)
        .build();
  }

}