MQTTSourceProvider.java
package de.dlr.bt.stc.source.mqtt;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.config.ACfg;
import de.dlr.bt.stc.config.ConfigurationManager;
import de.dlr.bt.stc.entities.TaskLifecycle;
import de.dlr.bt.stc.init.Register;
import de.dlr.bt.stc.source.ASourceProvider;
import de.dlr.bt.stc.task.TaskProviderFactory;
public class MQTTSourceProvider extends ASourceProvider<MqttSource> {
private final ConfigurationManager cfg;
private final EventBus instanceEventBus;
private MqttFactory mqttFactory;
@Register
public static void register() {
TaskProviderFactory.getInstance().registerCreator(SourceMQTTCfg.class, MQTTSourceProvider::new);
}
public MQTTSourceProvider(ConfigurationManager cfg) {
super(cfg.getManagementEventBus());
this.cfg = cfg;
this.instanceEventBus = cfg.getInstanceEventBus();
this.mqttFactory = MqttFactory.getInstance();
}
public MQTTSourceProvider(ConfigurationManager cfg, MqttFactory mqttFactory) {
this(cfg);
this.mqttFactory = mqttFactory;
}
@Override
public boolean initializeTasks() {
createSourcesForMqttConfigEntries()
.forEach(
mqttSource -> taskLifecycles.put(mqttSource.getTaskId(),
new TaskLifecycle<>(mqttSource)));
return super.initializeTasks();
}
private List<MqttSource> createSourcesForMqttConfigEntries() {
List<SourceMQTTCfg> allMqttConfigs = findAllMqttConfigs(cfg.getConfigurations());
Map<String, MqttClient> endpointToClient = createSharedClientsBasedOnEndpoints(allMqttConfigs);
return allMqttConfigs
.stream()
.map(conf -> createMqttSource(conf, endpointToClient))
.toList();
}
private Map<String, MqttClient> createSharedClientsBasedOnEndpoints(
List<SourceMQTTCfg> allMqttConfigs) {
return allMqttConfigs
.stream()
.map(SourceMQTTCfg::getEndpoint)
.distinct()
.collect(Collectors.toMap(endp -> endp, this::createClient));
}
private MqttClient createClient(String endpoint) {
try {
var subscriberId = "shepardTimeseriesCollector-" + UUID.randomUUID();
return mqttFactory.createMqttClient(subscriberId, endpoint);
} catch (MqttException e) {
throw new RuntimeException("Problems creating mqtt client client for Endpoint " + endpoint, e);
}
}
private List<SourceMQTTCfg> findAllMqttConfigs(Map<String, ACfg> configurations) {
return configurations.entrySet()
.stream()
.map(Entry::getValue)
.filter(config -> config instanceof SourceMQTTCfg)
.map(conf -> (SourceMQTTCfg) conf)
.toList();
}
private MqttSource createMqttSource(SourceMQTTCfg config, Map<String, MqttClient> endpointToClient) {
return MqttSource.builder()
.taskId(config.getId())
.mqttClient(endpointToClient.get(config.getEndpoint()))
.mqttFactory(mqttFactory)
.sourceConfig((SourceMQTTCfg) config)
.instanceEventBus(instanceEventBus)
.build();
}
}