ATaskExecutorProvider.java

package de.dlr.bt.stc.task;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.entities.TaskLifecycle;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class ATaskExecutorProvider<T extends ITask> extends ATaskProvider<T> {

	private final int checkIntervalMs;
	private final float threadFactor;

	@Setter
	private Consumer<T> executorFunction = null;

	private final Map<T, ScheduledFuture<?>> executorTasks = new HashMap<>();

	private ScheduledExecutorService executor;
	private final Random random = new Random();

	protected ATaskExecutorProvider(EventBus managementEventBus, int checkInterval, float threadFactor) {
		super(managementEventBus);
		this.checkIntervalMs = checkInterval;
		this.threadFactor = threadFactor;
	}

	@Override
	public boolean initializeTasks() {
		boolean ok = super.initializeTasks();

		if (executor == null)
			executor = Executors.newScheduledThreadPool((int) Math.ceil(taskLifecycles.size() * threadFactor));
		return ok;
	}

	@Override
	public void cleanupTasks() {
		super.cleanupTasks();

		if (executor != null) {
			executor.shutdown();
			try {
				executor.awaitTermination(checkIntervalMs * 2l, TimeUnit.MILLISECONDS);
				executor = null;
			} catch (InterruptedException e) {
				log.error("ATaskExecutorProvider thread was interrupted!");
				Thread.currentThread().interrupt();
			}
		}
	}

	@Override
	protected boolean startTask(TaskLifecycle<T> task) {
		boolean ok = super.startTask(task);
		if (executorFunction != null) {
			var future = executor.scheduleWithFixedDelay(() -> executorFunction.accept(task.getTask()),
					random.nextInt(checkIntervalMs), checkIntervalMs, TimeUnit.MILLISECONDS);
			executorTasks.put(task.getTask(), future);
		}
		return ok;
	}

	@Override
	protected void stopTask(TaskLifecycle<T> task) {
		super.stopTask(task);
		if (executorTasks.containsKey(task.getTask())) {
			executorTasks.get(task.getTask()).cancel(false);
		}

	}

}