BridgeProvider.java

package de.dlr.bt.stc.bridge;

import de.dlr.bt.stc.config.ConfigurationManager;
import de.dlr.bt.stc.entities.TaskLifecycle;
import de.dlr.bt.stc.init.Register;
import de.dlr.bt.stc.sink.ASinkCfg;
import de.dlr.bt.stc.source.ASourceCfg;
import de.dlr.bt.stc.task.ATaskExecutorProvider;
import de.dlr.bt.stc.task.IBridgeProvider;
import de.dlr.bt.stc.task.TaskProviderFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BridgeProvider extends ATaskExecutorProvider<Bridge> implements IBridgeProvider {

	private static final int CHECK_INTERVAL_MS = 1000;
	private static final float BRIDGE_THREAD_FACTOR = 1f / 10f;

	@Register
	public static void register() {
		TaskProviderFactory.getInstance().registerCreator(BridgeCfg.class, BridgeProvider::new);
	}

	private BridgeProvider(ConfigurationManager cfg) {
		super(cfg.getManagementEventBus(), CHECK_INTERVAL_MS, BRIDGE_THREAD_FACTOR);
		for (var entry : cfg.getConfigurations().entrySet()) {
			if (entry.getValue() instanceof BridgeCfg bcfg) {
				Bridge bg = new Bridge(entry.getKey(), bcfg, cfg.getInstanceEventBus());
				taskLifecycles.put(entry.getKey(), new TaskLifecycle<>(bg));
				checkBridgeConfiguration(bcfg, cfg);
			}
		}
		setExecutorFunction(this::runTask);
	}

	private static void checkBridgeConfiguration(BridgeCfg bridgeCfg, ConfigurationManager cfgMan) {
		var configurations = cfgMan.getConfigurations();
		var sourcecfg = configurations.get(bridgeCfg.getSourceId());
		if (!(sourcecfg instanceof ASourceCfg)) {
			log.warn("Bridge {} did not find sourceID {} - possible misconfiguration!", bridgeCfg.getId(),
					bridgeCfg.getSourceId());
		}
		var sinkcfg = configurations.get(bridgeCfg.getSinkId());
		if (!(sinkcfg instanceof ASinkCfg)) {
			log.warn("Bridge {} did not find sinkID {} - possible misconfiguration!", bridgeCfg.getId(),
					bridgeCfg.getSinkId());
		}
	}

	private void runTask(Bridge bridge) {
		log.debug("Triggered cache task for bridge {}", bridge);
		try {
			if (bridge.cacheFull()) {
				bridge.clearCache();
			}
		} catch (Exception ex) {
			log.error("Exception while running task for bridge {}: {}", bridge, ex);
		}
	}

}