STCInstance.java
package de.dlr.bt.stc.init;
import java.nio.file.Path;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import de.dlr.bt.stc.config.ConfigurationManager;
import de.dlr.bt.stc.task.ITaskProvider;
import de.dlr.bt.stc.task.TaskProviderFactory;
import de.dlr.bt.stc.task.TaskProviderSet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class STCInstance {
private final ConfigurationManager configuration;
/**
* Stores the created TaskProviders. Must only be accessed within a synchronized
* method!
*/
private Pair<TaskProviderSet, Boolean> tpfResult = null;
private final EventBus instanceEventBus;
private ScheduledExecutorService autoRestartService = null;
private ScheduledFuture<?> autoRestartTask = null;
/**
* Dqeue holding {@link InstanceEvent} items that need to be processed
* asynchronously
*/
private Deque<InstanceEvent> events = new ConcurrentLinkedDeque<>();
@Getter
private volatile boolean terminate;
/**
* Creates a new Shepard Timeseries Collector (STC) instance
*
* @param managementEventBus An instance of {@link EventBus} which is used for
* central management events.
*/
public STCInstance(EventBus managementEventBus) {
this.instanceEventBus = new EventBus();
this.configuration = new ConfigurationManager(this.instanceEventBus, managementEventBus);
managementEventBus.register(this);
}
private void addConfigurationDirectory(Path path) {
configuration.addConfigurationPath(path);
}
private void unloadConfiguration() {
stopComponents(false);
configuration.unloadConfiguration();
}
private void loadConfiguration() {
configuration.loadConfigurations();
}
private void startComponents(long restartInterval) {
if (autoRestartTask != null)
return;
boolean firstStartSuccessful = startComponentsInternal();
if (!firstStartSuccessful && restartInterval > 0) {
autoRestartService = Executors.newSingleThreadScheduledExecutor();
autoRestartTask = autoRestartService.scheduleAtFixedRate(this::startComponentsInternal, restartInterval,
restartInterval, TimeUnit.MILLISECONDS);
}
}
private synchronized boolean startComponentsInternal() {
log.debug("Starting components start");
if (tpfResult == null)
tpfResult = TaskProviderFactory.getInstance().createInstances(configuration);
var taskProvider = tpfResult.getLeft();
boolean ok = tpfResult.getRight();
ok &= startProviders(taskProvider.getSinks());
ok &= startProviders(taskProvider.getBridges());
ok &= startProviders(taskProvider.getSources());
log.debug("Starting components end");
return ok;
}
private boolean startProviders(List<? extends ITaskProvider> providers) {
boolean ok = true;
for (var tp : providers) {
try {
ok &= tp.initializeTasks();
} catch (RuntimeException ex) {
ok = false;
log.error("An unexpected RuntimeException occured during initializeTasks", ex);
}
}
for (var tp : providers) {
try {
ok &= tp.startTasks();
} catch (RuntimeException ex) {
ok = false;
log.error("An unexpected RuntimeException occured during startTasks", ex);
}
}
return ok;
}
/**
* Stop all components that have been started by this instance.
*/
private void stopComponents(boolean terminate) {
log.info("Stopping all components ...");
if (autoRestartTask != null) {
autoRestartTask.cancel(false);
autoRestartTask = null;
}
if (autoRestartService != null) {
autoRestartService.shutdown();
autoRestartService = null;
}
stopComponentsInternal();
this.terminate = terminate;
}
private synchronized void stopComponentsInternal() {
if (tpfResult == null)
return;
var taskProvider = tpfResult.getLeft();
stopProviders(taskProvider.getSources());
stopProviders(taskProvider.getBridges());
stopProviders(taskProvider.getSinks());
// Delete the reference to all task providers so that a new TaskProviderSet will
// be created when start is called again
tpfResult = null;
}
private void stopProviders(List<? extends ITaskProvider> providers) {
for (var tp : providers) {
try {
tp.stopTasks();
} catch (RuntimeException ex) {
log.error("An unexpected RuntimeException occured during stopTasks", ex);
}
}
for (var tp : providers) {
try {
tp.cleanupTasks();
} catch (RuntimeException ex) {
log.error("An unexpected RuntimeException occured during cleanupTasks", ex);
}
}
}
@Subscribe
public void onInstanceEvent(InstanceEvent ie) {
events.addLast(ie);
synchronized (this) {
this.notifyAll();
}
}
public void run() {
while (!terminate) {
var event = events.pollFirst();
if (event != null) {
event.handle(this);
} else {
synchronized (this) {
try {
this.wait();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
}
/**
* Super class for all events that trigger actions within the
* {@link STCInstance}.
*/
@EqualsAndHashCode
public abstract static class InstanceEvent {
protected abstract void handle(STCInstance instance);
}
@Value
@EqualsAndHashCode(callSuper = true)
public static class StartEvent extends InstanceEvent {
private long autoRestartTime;
protected void handle(STCInstance instance) {
instance.startComponents(getAutoRestartTime());
}
}
@Value
@EqualsAndHashCode(callSuper = true)
public static class StopEvent extends InstanceEvent {
private boolean terminate;
@Override
protected void handle(STCInstance instance) {
instance.stopComponents(isTerminate());
}
}
@Value
@EqualsAndHashCode(callSuper = true)
public static class LoadConfiguration extends InstanceEvent {
@Override
protected void handle(STCInstance instance) {
instance.loadConfiguration();
}
}
@Value
@EqualsAndHashCode(callSuper = true)
public static class UnloadConfiguration extends InstanceEvent {
@Override
protected void handle(STCInstance instance) {
instance.unloadConfiguration();
}
}
@Value
@EqualsAndHashCode(callSuper = true)
public static class AddConfigurationFolder extends InstanceEvent {
private Path folder;
@Override
protected void handle(STCInstance instance) {
instance.addConfigurationDirectory(getFolder());
}
}
}