ATaskExecutorProvider.java
package de.dlr.bt.stc.task;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.entities.TaskLifecycle;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class ATaskExecutorProvider<T extends ITask> extends ATaskProvider<T> {
private final int checkIntervalMs;
private final float threadFactor;
@Setter
private Consumer<T> executorFunction = null;
private final Map<T, ScheduledFuture<?>> executorTasks = new HashMap<>();
private ScheduledExecutorService executor;
private final Random random = new Random();
protected ATaskExecutorProvider(EventBus managementEventBus, int checkInterval, float threadFactor) {
super(managementEventBus);
this.checkIntervalMs = checkInterval;
this.threadFactor = threadFactor;
}
@Override
public boolean initializeTasks() {
boolean ok = super.initializeTasks();
if (executor == null)
executor = Executors.newScheduledThreadPool((int) Math.ceil(taskLifecycles.size() * threadFactor));
return ok;
}
@Override
public void cleanupTasks() {
super.cleanupTasks();
if (executor != null) {
executor.shutdown();
try {
executor.awaitTermination(checkIntervalMs * 2l, TimeUnit.MILLISECONDS);
executor = null;
} catch (InterruptedException e) {
log.error("ATaskExecutorProvider thread was interrupted!");
Thread.currentThread().interrupt();
}
}
}
@Override
protected boolean startTask(TaskLifecycle<T> task) {
boolean ok = super.startTask(task);
if (executorFunction != null) {
var future = executor.scheduleWithFixedDelay(() -> executorFunction.accept(task.getTask()),
random.nextInt(checkIntervalMs), checkIntervalMs, TimeUnit.MILLISECONDS);
executorTasks.put(task.getTask(), future);
}
return ok;
}
@Override
protected void stopTask(TaskLifecycle<T> task) {
super.stopTask(task);
if (executorTasks.containsKey(task.getTask())) {
executorTasks.get(task.getTask()).cancel(false);
}
}
}