ASinkProvider.java

package de.dlr.bt.stc.sink;

import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.entities.TaskLifecycle;
import de.dlr.bt.stc.task.ATaskExecutorProvider;
import de.dlr.bt.stc.task.ISinkProvider;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class ASinkProvider<T extends ASink> extends ATaskExecutorProvider<T> implements ISinkProvider {

	private static final int EXECUTOR_INTERVAL_MS = 1000;
	private static final float SINK_THREAD_FACTOR = 1f / 10f;

	protected ASinkProvider(EventBus managementEventBus) {
		super(managementEventBus, EXECUTOR_INTERVAL_MS, SINK_THREAD_FACTOR);
		setExecutorFunction(this::runTask);
	}

	protected void putSink(String key, T sink) {
		taskLifecycles.put(key, new TaskLifecycle<>(sink));
	}

	private void runTask(T sink) {
		log.debug("Triggered cache task for sink {}", sink);
		try {
			sink.handleEvents();
		} catch (Exception ex) {
			log.error("Exception while running task for sink {}: {}", sink, ex);
		}
	}

}