View Javadoc
1   package de.dlr.bt.stc.init;
2   
3   import java.nio.file.Path;
4   import java.util.Deque;
5   import java.util.List;
6   import java.util.concurrent.ConcurrentLinkedDeque;
7   import java.util.concurrent.Executors;
8   import java.util.concurrent.ScheduledExecutorService;
9   import java.util.concurrent.ScheduledFuture;
10  import java.util.concurrent.TimeUnit;
11  
12  import org.apache.commons.lang3.tuple.Pair;
13  import org.greenrobot.eventbus.EventBus;
14  import org.greenrobot.eventbus.Subscribe;
15  
16  import de.dlr.bt.stc.config.ConfigurationManager;
17  import de.dlr.bt.stc.task.ITaskProvider;
18  import de.dlr.bt.stc.task.TaskProviderFactory;
19  import de.dlr.bt.stc.task.TaskProviderSet;
20  import lombok.EqualsAndHashCode;
21  import lombok.Getter;
22  import lombok.Value;
23  import lombok.extern.slf4j.Slf4j;
24  
25  @Slf4j
26  public final class STCInstance {
27  	private final ConfigurationManager configuration;
28  
29  	/**
30  	 * Stores the created TaskProviders. Must only be accessed within a synchronized
31  	 * method!
32  	 */
33  	private Pair<TaskProviderSet, Boolean> tpfResult = null;
34  
35  	private final EventBus instanceEventBus;
36  
37  	private ScheduledExecutorService autoRestartService = null;
38  	private ScheduledFuture<?> autoRestartTask = null;
39  
40  	/**
41  	 * Dqeue holding {@link InstanceEvent} items that need to be processed
42  	 * asynchronously
43  	 */
44  	private Deque<InstanceEvent> events = new ConcurrentLinkedDeque<>();
45  
46  	@Getter
47  	private volatile boolean terminate;
48  
49  	/**
50  	 * Creates a new Shepard Timeseries Collector (STC) instance
51  	 * 
52  	 * @param managementEventBus An instance of {@link EventBus} which is used for
53  	 *                           central management events.
54  	 */
55  	public STCInstance(EventBus managementEventBus) {
56  		this.instanceEventBus = new EventBus();
57  		this.configuration = new ConfigurationManager(this.instanceEventBus, managementEventBus);
58  
59  		managementEventBus.register(this);
60  	}
61  
62  	private void addConfigurationDirectory(Path path) {
63  		configuration.addConfigurationPath(path);
64  	}
65  
66  	private void unloadConfiguration() {
67  		stopComponents(false);
68  		configuration.unloadConfiguration();
69  	}
70  
71  	private void loadConfiguration() {
72  		configuration.loadConfigurations();
73  	}
74  
75  	private void startComponents(long restartInterval) {
76  		if (autoRestartTask != null)
77  			return;
78  
79  		boolean firstStartSuccessful = startComponentsInternal();
80  
81  		if (!firstStartSuccessful && restartInterval > 0) {
82  			autoRestartService = Executors.newSingleThreadScheduledExecutor();
83  			autoRestartTask = autoRestartService.scheduleAtFixedRate(this::startComponentsInternal, restartInterval,
84  					restartInterval, TimeUnit.MILLISECONDS);
85  
86  		}
87  	}
88  
89  	private synchronized boolean startComponentsInternal() {
90  		log.debug("Starting components start");
91  		if (tpfResult == null)
92  			tpfResult = TaskProviderFactory.getInstance().createInstances(configuration);
93  
94  		var taskProvider = tpfResult.getLeft();
95  		boolean ok = tpfResult.getRight();
96  
97  		ok &= startProviders(taskProvider.getSinks());
98  		ok &= startProviders(taskProvider.getBridges());
99  		ok &= startProviders(taskProvider.getSources());
100 
101 		log.debug("Starting components end");
102 		return ok;
103 	}
104 
105 	private boolean startProviders(List<? extends ITaskProvider> providers) {
106 		boolean ok = true;
107 		for (var tp : providers) {
108 			try {
109 				ok &= tp.initializeTasks();
110 			} catch (RuntimeException ex) {
111 				ok = false;
112 				log.error("An unexpected RuntimeException occured during initializeTasks", ex);
113 			}
114 		}
115 		for (var tp : providers) {
116 			try {
117 				ok &= tp.startTasks();
118 			} catch (RuntimeException ex) {
119 				ok = false;
120 				log.error("An unexpected RuntimeException occured during startTasks", ex);
121 			}
122 		}
123 		return ok;
124 	}
125 
126 	/**
127 	 * Stop all components that have been started by this instance.
128 	 */
129 	private void stopComponents(boolean terminate) {
130 		log.info("Stopping all components ...");
131 		if (autoRestartTask != null) {
132 			autoRestartTask.cancel(false);
133 			autoRestartTask = null;
134 		}
135 		if (autoRestartService != null) {
136 			autoRestartService.shutdown();
137 			autoRestartService = null;
138 		}
139 
140 		stopComponentsInternal();
141 
142 		this.terminate = terminate;
143 	}
144 
145 	private synchronized void stopComponentsInternal() {
146 		if (tpfResult == null)
147 			return;
148 
149 		var taskProvider = tpfResult.getLeft();
150 
151 		stopProviders(taskProvider.getSources());
152 		stopProviders(taskProvider.getBridges());
153 		stopProviders(taskProvider.getSinks());
154 
155 		// Delete the reference to all task providers so that a new TaskProviderSet will
156 		// be created when start is called again
157 		tpfResult = null;
158 	}
159 
160 	private void stopProviders(List<? extends ITaskProvider> providers) {
161 		for (var tp : providers) {
162 			try {
163 				tp.stopTasks();
164 			} catch (RuntimeException ex) {
165 				log.error("An unexpected RuntimeException occured during stopTasks", ex);
166 			}
167 
168 		}
169 		for (var tp : providers) {
170 			try {
171 				tp.cleanupTasks();
172 			} catch (RuntimeException ex) {
173 				log.error("An unexpected RuntimeException occured during cleanupTasks", ex);
174 			}
175 
176 		}
177 	}
178 
179 	@Subscribe
180 	public void onInstanceEvent(InstanceEvent ie) {
181 		events.addLast(ie);
182 		synchronized (this) {
183 			this.notifyAll();
184 		}
185 	}
186 
187 	public void run() {
188 		while (!terminate) {
189 			var event = events.pollFirst();
190 			if (event != null) {
191 				event.handle(this);
192 			} else {
193 				synchronized (this) {
194 					try {
195 						this.wait();
196 					} catch (InterruptedException ie) {
197 						Thread.currentThread().interrupt();
198 					}
199 				}
200 			}
201 		}
202 	}
203 
204 	/**
205 	 * Super class for all events that trigger actions within the
206 	 * {@link STCInstance}.
207 	 */
208 	@EqualsAndHashCode
209 	public abstract static class InstanceEvent {
210 		protected abstract void handle(STCInstance instance);
211 	}
212 
213 	@Value
214 	@EqualsAndHashCode(callSuper = true)
215 	public static class StartEvent extends InstanceEvent {
216 		private long autoRestartTime;
217 
218 		protected void handle(STCInstance instance) {
219 			instance.startComponents(getAutoRestartTime());
220 		}
221 	}
222 
223 	@Value
224 	@EqualsAndHashCode(callSuper = true)
225 	public static class StopEvent extends InstanceEvent {
226 		private boolean terminate;
227 
228 		@Override
229 		protected void handle(STCInstance instance) {
230 			instance.stopComponents(isTerminate());
231 		}
232 	}
233 
234 	@Value
235 	@EqualsAndHashCode(callSuper = true)
236 	public static class LoadConfiguration extends InstanceEvent {
237 		@Override
238 		protected void handle(STCInstance instance) {
239 			instance.loadConfiguration();
240 		}
241 	}
242 
243 	@Value
244 	@EqualsAndHashCode(callSuper = true)
245 	public static class UnloadConfiguration extends InstanceEvent {
246 		@Override
247 		protected void handle(STCInstance instance) {
248 			instance.unloadConfiguration();
249 		}
250 	}
251 
252 	@Value
253 	@EqualsAndHashCode(callSuper = true)
254 	public static class AddConfigurationFolder extends InstanceEvent {
255 		private Path folder;
256 
257 		@Override
258 		protected void handle(STCInstance instance) {
259 			instance.addConfigurationDirectory(getFolder());
260 		}
261 	}
262 
263 }