ASinkProvider.java
package de.dlr.bt.stc.sink;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.entities.TaskLifecycle;
import de.dlr.bt.stc.task.ATaskExecutorProvider;
import de.dlr.bt.stc.task.ISinkProvider;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class ASinkProvider<T extends ASink> extends ATaskExecutorProvider<T> implements ISinkProvider {
private static final int EXECUTOR_INTERVAL_MS = 1000;
private static final float SINK_THREAD_FACTOR = 1f / 10f;
protected ASinkProvider(EventBus managementEventBus) {
super(managementEventBus, EXECUTOR_INTERVAL_MS, SINK_THREAD_FACTOR);
setExecutorFunction(this::runTask);
}
protected void putSink(String key, T sink) {
taskLifecycles.put(key, new TaskLifecycle<>(sink));
}
private void runTask(T sink) {
log.debug("Triggered cache task for sink {}", sink);
try {
sink.handleEvents();
} catch (Exception ex) {
log.error("Exception while running task for sink {}: {}", sink, ex);
}
}
}