1 package de.dlr.bt.stc.init;
2
3 import java.nio.file.Path;
4 import java.util.Deque;
5 import java.util.List;
6 import java.util.concurrent.ConcurrentLinkedDeque;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.ScheduledExecutorService;
9 import java.util.concurrent.ScheduledFuture;
10 import java.util.concurrent.TimeUnit;
11
12 import org.apache.commons.lang3.tuple.Pair;
13 import org.greenrobot.eventbus.EventBus;
14 import org.greenrobot.eventbus.Subscribe;
15
16 import de.dlr.bt.stc.config.ConfigurationManager;
17 import de.dlr.bt.stc.task.ITaskProvider;
18 import de.dlr.bt.stc.task.TaskProviderFactory;
19 import de.dlr.bt.stc.task.TaskProviderSet;
20 import lombok.EqualsAndHashCode;
21 import lombok.Getter;
22 import lombok.Value;
23 import lombok.extern.slf4j.Slf4j;
24
25 @Slf4j
26 public final class STCInstance {
27 private final ConfigurationManager configuration;
28
29
30
31
32
33 private Pair<TaskProviderSet, Boolean> tpfResult = null;
34
35 private final EventBus instanceEventBus;
36
37 private ScheduledExecutorService autoRestartService = null;
38 private ScheduledFuture<?> autoRestartTask = null;
39
40
41
42
43
44 private Deque<InstanceEvent> events = new ConcurrentLinkedDeque<>();
45
46 @Getter
47 private volatile boolean terminate;
48
49
50
51
52
53
54
55 public STCInstance(EventBus managementEventBus) {
56 this.instanceEventBus = new EventBus();
57 this.configuration = new ConfigurationManager(this.instanceEventBus, managementEventBus);
58
59 managementEventBus.register(this);
60 }
61
62 private void addConfigurationDirectory(Path path) {
63 configuration.addConfigurationPath(path);
64 }
65
66 private void unloadConfiguration() {
67 stopComponents(false);
68 configuration.unloadConfiguration();
69 }
70
71 private void loadConfiguration() {
72 configuration.loadConfigurations();
73 }
74
75 private void startComponents(long restartInterval) {
76 if (autoRestartTask != null)
77 return;
78
79 boolean firstStartSuccessful = startComponentsInternal();
80
81 if (!firstStartSuccessful && restartInterval > 0) {
82 autoRestartService = Executors.newSingleThreadScheduledExecutor();
83 autoRestartTask = autoRestartService.scheduleAtFixedRate(this::startComponentsInternal, restartInterval,
84 restartInterval, TimeUnit.MILLISECONDS);
85
86 }
87 }
88
89 private synchronized boolean startComponentsInternal() {
90 log.debug("Starting components start");
91 if (tpfResult == null)
92 tpfResult = TaskProviderFactory.getInstance().createInstances(configuration);
93
94 var taskProvider = tpfResult.getLeft();
95 boolean ok = tpfResult.getRight();
96
97 ok &= startProviders(taskProvider.getSinks());
98 ok &= startProviders(taskProvider.getBridges());
99 ok &= startProviders(taskProvider.getSources());
100
101 log.debug("Starting components end");
102 return ok;
103 }
104
105 private boolean startProviders(List<? extends ITaskProvider> providers) {
106 boolean ok = true;
107 for (var tp : providers) {
108 try {
109 ok &= tp.initializeTasks();
110 } catch (RuntimeException ex) {
111 ok = false;
112 log.error("An unexpected RuntimeException occured during initializeTasks", ex);
113 }
114 }
115 for (var tp : providers) {
116 try {
117 ok &= tp.startTasks();
118 } catch (RuntimeException ex) {
119 ok = false;
120 log.error("An unexpected RuntimeException occured during startTasks", ex);
121 }
122 }
123 return ok;
124 }
125
126
127
128
129 private void stopComponents(boolean terminate) {
130 log.info("Stopping all components ...");
131 if (autoRestartTask != null) {
132 autoRestartTask.cancel(false);
133 autoRestartTask = null;
134 }
135 if (autoRestartService != null) {
136 autoRestartService.shutdown();
137 autoRestartService = null;
138 }
139
140 stopComponentsInternal();
141
142 this.terminate = terminate;
143 }
144
145 private synchronized void stopComponentsInternal() {
146 if (tpfResult == null)
147 return;
148
149 var taskProvider = tpfResult.getLeft();
150
151 stopProviders(taskProvider.getSources());
152 stopProviders(taskProvider.getBridges());
153 stopProviders(taskProvider.getSinks());
154
155
156
157 tpfResult = null;
158 }
159
160 private void stopProviders(List<? extends ITaskProvider> providers) {
161 for (var tp : providers) {
162 try {
163 tp.stopTasks();
164 } catch (RuntimeException ex) {
165 log.error("An unexpected RuntimeException occured during stopTasks", ex);
166 }
167
168 }
169 for (var tp : providers) {
170 try {
171 tp.cleanupTasks();
172 } catch (RuntimeException ex) {
173 log.error("An unexpected RuntimeException occured during cleanupTasks", ex);
174 }
175
176 }
177 }
178
179 @Subscribe
180 public void onInstanceEvent(InstanceEvent ie) {
181 events.addLast(ie);
182 synchronized (this) {
183 this.notifyAll();
184 }
185 }
186
187 public void run() {
188 while (!terminate) {
189 var event = events.pollFirst();
190 if (event != null) {
191 event.handle(this);
192 } else {
193 synchronized (this) {
194 try {
195 this.wait();
196 } catch (InterruptedException ie) {
197 Thread.currentThread().interrupt();
198 }
199 }
200 }
201 }
202 }
203
204
205
206
207
208 @EqualsAndHashCode
209 public abstract static class InstanceEvent {
210 protected abstract void handle(STCInstance instance);
211 }
212
213 @Value
214 @EqualsAndHashCode(callSuper = true)
215 public static class StartEvent extends InstanceEvent {
216 private long autoRestartTime;
217
218 protected void handle(STCInstance instance) {
219 instance.startComponents(getAutoRestartTime());
220 }
221 }
222
223 @Value
224 @EqualsAndHashCode(callSuper = true)
225 public static class StopEvent extends InstanceEvent {
226 private boolean terminate;
227
228 @Override
229 protected void handle(STCInstance instance) {
230 instance.stopComponents(isTerminate());
231 }
232 }
233
234 @Value
235 @EqualsAndHashCode(callSuper = true)
236 public static class LoadConfiguration extends InstanceEvent {
237 @Override
238 protected void handle(STCInstance instance) {
239 instance.loadConfiguration();
240 }
241 }
242
243 @Value
244 @EqualsAndHashCode(callSuper = true)
245 public static class UnloadConfiguration extends InstanceEvent {
246 @Override
247 protected void handle(STCInstance instance) {
248 instance.unloadConfiguration();
249 }
250 }
251
252 @Value
253 @EqualsAndHashCode(callSuper = true)
254 public static class AddConfigurationFolder extends InstanceEvent {
255 private Path folder;
256
257 @Override
258 protected void handle(STCInstance instance) {
259 instance.addConfigurationDirectory(getFolder());
260 }
261 }
262
263 }