STCInstance.java

package de.dlr.bt.stc.init;

import java.nio.file.Path;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.tuple.Pair;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;

import de.dlr.bt.stc.config.ConfigurationManager;
import de.dlr.bt.stc.task.ITaskProvider;
import de.dlr.bt.stc.task.TaskProviderFactory;
import de.dlr.bt.stc.task.TaskProviderSet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public final class STCInstance {
	private final ConfigurationManager configuration;

	/**
	 * Stores the created TaskProviders. Must only be accessed within a synchronized
	 * method!
	 */
	private Pair<TaskProviderSet, Boolean> tpfResult = null;

	private final EventBus instanceEventBus;

	private ScheduledExecutorService autoRestartService = null;
	private ScheduledFuture<?> autoRestartTask = null;

	/**
	 * Dqeue holding {@link InstanceEvent} items that need to be processed
	 * asynchronously
	 */
	private Deque<InstanceEvent> events = new ConcurrentLinkedDeque<>();

	@Getter
	private volatile boolean terminate;

	/**
	 * Creates a new Shepard Timeseries Collector (STC) instance
	 * 
	 * @param managementEventBus An instance of {@link EventBus} which is used for
	 *                           central management events.
	 */
	public STCInstance(EventBus managementEventBus) {
		this.instanceEventBus = new EventBus();
		this.configuration = new ConfigurationManager(this.instanceEventBus, managementEventBus);

		managementEventBus.register(this);
	}

	private void addConfigurationDirectory(Path path) {
		configuration.addConfigurationPath(path);
	}

	private void unloadConfiguration() {
		stopComponents(false);
		configuration.unloadConfiguration();
	}

	private void loadConfiguration() {
		configuration.loadConfigurations();
	}

	private void startComponents(long restartInterval) {
		if (autoRestartTask != null)
			return;

		boolean firstStartSuccessful = startComponentsInternal();

		if (!firstStartSuccessful && restartInterval > 0) {
			autoRestartService = Executors.newSingleThreadScheduledExecutor();
			autoRestartTask = autoRestartService.scheduleAtFixedRate(this::startComponentsInternal, restartInterval,
					restartInterval, TimeUnit.MILLISECONDS);

		}
	}

	private synchronized boolean startComponentsInternal() {
		log.debug("Starting components start");
		if (tpfResult == null)
			tpfResult = TaskProviderFactory.getInstance().createInstances(configuration);

		var taskProvider = tpfResult.getLeft();
		boolean ok = tpfResult.getRight();

		ok &= startProviders(taskProvider.getSinks());
		ok &= startProviders(taskProvider.getBridges());
		ok &= startProviders(taskProvider.getSources());

		log.debug("Starting components end");
		return ok;
	}

	private boolean startProviders(List<? extends ITaskProvider> providers) {
		boolean ok = true;
		for (var tp : providers) {
			try {
				ok &= tp.initializeTasks();
			} catch (RuntimeException ex) {
				ok = false;
				log.error("An unexpected RuntimeException occured during initializeTasks", ex);
			}
		}
		for (var tp : providers) {
			try {
				ok &= tp.startTasks();
			} catch (RuntimeException ex) {
				ok = false;
				log.error("An unexpected RuntimeException occured during startTasks", ex);
			}
		}
		return ok;
	}

	/**
	 * Stop all components that have been started by this instance.
	 */
	private void stopComponents(boolean terminate) {
		log.info("Stopping all components ...");
		if (autoRestartTask != null) {
			autoRestartTask.cancel(false);
			autoRestartTask = null;
		}
		if (autoRestartService != null) {
			autoRestartService.shutdown();
			autoRestartService = null;
		}

		stopComponentsInternal();

		this.terminate = terminate;
	}

	private synchronized void stopComponentsInternal() {
		if (tpfResult == null)
			return;

		var taskProvider = tpfResult.getLeft();

		stopProviders(taskProvider.getSources());
		stopProviders(taskProvider.getBridges());
		stopProviders(taskProvider.getSinks());

		// Delete the reference to all task providers so that a new TaskProviderSet will
		// be created when start is called again
		tpfResult = null;
	}

	private void stopProviders(List<? extends ITaskProvider> providers) {
		for (var tp : providers) {
			try {
				tp.stopTasks();
			} catch (RuntimeException ex) {
				log.error("An unexpected RuntimeException occured during stopTasks", ex);
			}

		}
		for (var tp : providers) {
			try {
				tp.cleanupTasks();
			} catch (RuntimeException ex) {
				log.error("An unexpected RuntimeException occured during cleanupTasks", ex);
			}

		}
	}

	@Subscribe
	public void onInstanceEvent(InstanceEvent ie) {
		events.addLast(ie);
		synchronized (this) {
			this.notifyAll();
		}
	}

	public void run() {
		while (!terminate) {
			var event = events.pollFirst();
			if (event != null) {
				event.handle(this);
			} else {
				synchronized (this) {
					try {
						this.wait();
					} catch (InterruptedException ie) {
						Thread.currentThread().interrupt();
					}
				}
			}
		}
	}

	/**
	 * Super class for all events that trigger actions within the
	 * {@link STCInstance}.
	 */
	@EqualsAndHashCode
	public abstract static class InstanceEvent {
		protected abstract void handle(STCInstance instance);
	}

	@Value
	@EqualsAndHashCode(callSuper = true)
	public static class StartEvent extends InstanceEvent {
		private long autoRestartTime;

		protected void handle(STCInstance instance) {
			instance.startComponents(getAutoRestartTime());
		}
	}

	@Value
	@EqualsAndHashCode(callSuper = true)
	public static class StopEvent extends InstanceEvent {
		private boolean terminate;

		@Override
		protected void handle(STCInstance instance) {
			instance.stopComponents(isTerminate());
		}
	}

	@Value
	@EqualsAndHashCode(callSuper = true)
	public static class LoadConfiguration extends InstanceEvent {
		@Override
		protected void handle(STCInstance instance) {
			instance.loadConfiguration();
		}
	}

	@Value
	@EqualsAndHashCode(callSuper = true)
	public static class UnloadConfiguration extends InstanceEvent {
		@Override
		protected void handle(STCInstance instance) {
			instance.unloadConfiguration();
		}
	}

	@Value
	@EqualsAndHashCode(callSuper = true)
	public static class AddConfigurationFolder extends InstanceEvent {
		private Path folder;

		@Override
		protected void handle(STCInstance instance) {
			instance.addConfigurationDirectory(getFolder());
		}
	}

}