1 package de.dlr.bt.stc.bridge;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.Date;
6 import java.util.HashMap;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.concurrent.ConcurrentLinkedQueue;
10
11 import org.apache.commons.configuration2.ex.ConfigurationException;
12 import org.greenrobot.eventbus.EventBus;
13 import org.greenrobot.eventbus.Subscribe;
14 import org.greenrobot.eventbus.ThreadMode;
15
16 import com.api.jsonata4java.expressions.EvaluateException;
17 import com.api.jsonata4java.expressions.Expressions;
18 import com.api.jsonata4java.expressions.ParseException;
19 import com.fasterxml.jackson.databind.JsonNode;
20 import com.fasterxml.jackson.databind.ObjectMapper;
21 import com.fasterxml.jackson.databind.json.JsonMapper;
22 import com.fasterxml.jackson.databind.node.ObjectNode;
23 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
24 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
25
26 import de.dlr.bt.stc.entities.DataPoint;
27 import de.dlr.bt.stc.entities.Mapping;
28 import de.dlr.bt.stc.eventbus.CacheFullEvent;
29 import de.dlr.bt.stc.eventbus.DataAvailableEvent;
30 import de.dlr.bt.stc.exceptions.TemplateException;
31 import de.dlr.bt.stc.task.ITask;
32 import de.dlr.bt.stc.util.DateHelper;
33 import lombok.Getter;
34 import lombok.ToString;
35 import lombok.extern.slf4j.Slf4j;
36
37 @Slf4j
38 @ToString(onlyExplicitlyIncluded = true)
39 public class Bridge implements ITask {
40
41 private final ConcurrentLinkedQueue<DataAvailableEvent> cache = new ConcurrentLinkedQueue<>();
42 private final DateHelper dateHelper = new DateHelper();
43 private final ObjectMapper mapper;
44 private final EventBus eventBus;
45
46 @ToString.Include
47 @Getter
48 protected final String key;
49 @ToString.Include
50 @Getter
51 private final String sourceKey;
52 @ToString.Include
53 @Getter
54 private final String sinkKey;
55
56 @Getter
57 private final Integer queueSize;
58 @Getter
59 private final Integer queueDuration;
60 private final Mapping mappingTemplate;
61 @Getter
62 private final String valueTemplate;
63 @Getter
64 private Date cacheCleared = new Date(0);
65
66 public Bridge(String key, BridgeCfg config, EventBus eventBus) {
67 this.key = key;
68 this.sourceKey = config.getSourceId();
69 this.sinkKey = config.getSinkId();
70 this.queueSize = config.getQueueSize();
71 this.queueDuration = config.getQueueDuration();
72 this.mappingTemplate = new Mapping(config.getMapping().getMeasurement(), config.getMapping().getLocation(),
73 config.getMapping().getDevice(), config.getMapping().getSymbolicName(), config.getMapping().getField());
74 this.valueTemplate = config.getValueTemplate();
75 this.eventBus = eventBus;
76
77 mapper = JsonMapper.builder().addModule(new Jdk8Module()).addModule(new JavaTimeModule()).build();
78 }
79
80 @Override
81 public void initializeTask() throws ConfigurationException {
82 log.debug("Initializing bridge {}", key);
83 if (!validate()) {
84 throw new ConfigurationException(String.format("Bridge %s is misconfigured", key));
85 }
86 }
87
88 @Override
89 public void startTask() {
90 log.debug("Starting bridge {}", key);
91 eventBus.register(this);
92 }
93
94 @Override
95 public void stopTask() {
96 log.debug("Stopping bridge {}", key);
97 eventBus.unregister(this);
98 clearCache();
99 }
100
101 @Subscribe(threadMode = ThreadMode.MAIN)
102 public void onDataAvailableEvent(DataAvailableEvent event) {
103 if (sourceKey.equals(event.getSourceKey())) {
104 log.debug("Received DataAvailableEvent in bridge {}: {}", key, event.toString());
105 cache.add(event);
106 }
107 }
108
109 public boolean cacheFull() {
110 if (cache.isEmpty())
111 return false;
112
113 if (queueSize > 0 && queueSize <= cache.size())
114 return true;
115
116 var cacheAge = Math.abs(dateHelper.getDate().getTime() - cacheCleared.getTime());
117 return queueDuration > 0 && queueDuration <= cacheAge;
118 }
119
120
121
122
123
124
125
126
127
128 public int getCurrentCacheSize() {
129 return cache.size();
130 }
131
132 public void clearCache() {
133 log.info("Clear cache in bridge {}", key);
134 cacheCleared = dateHelper.getDate();
135 Map<Mapping, List<DataPoint>> tempCache = new HashMap<>();
136
137 DataAvailableEvent dataAvailableEvent;
138 while ((dataAvailableEvent = cache.poll()) != null) {
139 var mapping = fillTemplate(mappingTemplate, dataAvailableEvent.getVariableMap());
140 if (!tempCache.containsKey(mapping))
141 tempCache.put(mapping, new ArrayList<>());
142 Object value;
143 try {
144 value = useValueTemplate(dataAvailableEvent.getValue());
145 } catch (TemplateException ex) {
146 log.error("Could not parse the value template for bridge {}, skipping this data point", this);
147 continue;
148 }
149 tempCache.get(mapping).add(new DataPoint(dataAvailableEvent.getTimestamp(), value));
150 }
151 for (var entry : tempCache.entrySet()) {
152 eventBus.post(new CacheFullEvent(sinkKey, entry.getValue(), entry.getKey()));
153 }
154
155 }
156
157 private Mapping fillTemplate(Mapping template, Map<String, String> variableMap) {
158 var measurement = fillTemplate(template.getMeasurement(), variableMap);
159 var location = fillTemplate(template.getLocation(), variableMap);
160 var device = fillTemplate(template.getDevice(), variableMap);
161 var symbolicName = fillTemplate(template.getSymbolicName(), variableMap);
162 var field = fillTemplate(template.getField(), variableMap);
163 return new Mapping(measurement, location, device, symbolicName, field);
164 }
165
166 private String fillTemplate(String template, Map<String, String> variableMap) {
167 var result = new StringBuilder();
168 var cursor = 0;
169 while (cursor < template.length()) {
170 var temp = template.substring(cursor);
171 int start = temp.indexOf("{{");
172 int end = temp.indexOf("}}");
173 if (start > -1 && end > -1) {
174 if (end > start) {
175
176 var variable = temp.substring(start + 2, end).strip();
177 result.append(temp.substring(0, start));
178 result.append(variableMap.getOrDefault(variable, "undefined"));
179 cursor = cursor + end + 2;
180 } else {
181
182 result.append(temp.substring(0, start));
183 cursor = cursor + start;
184 }
185 } else {
186
187 result.append(temp);
188 cursor = template.length();
189 }
190 }
191 return result.toString();
192 }
193
194 private Object useValueTemplate(Object value) throws TemplateException {
195 ObjectNode node = mapper.createObjectNode();
196 node.set("value", mapper.valueToTree(value));
197
198 Object newValue;
199 try {
200 JsonNode result = Expressions.parse(valueTemplate).evaluate(node);
201 newValue = mapper.treeToValue(result, Object.class);
202 } catch (IOException | ParseException | EvaluateException e) {
203 throw new TemplateException(e);
204 }
205 return newValue;
206 }
207
208 private boolean validate() {
209 if (queueSize.equals(-1) && queueDuration.equals(-1)) {
210 log.error("Either the queue size or the queue duration must be defined");
211 return false;
212 } else if (queueSize < -1 || queueDuration < -1) {
213 log.error("Queue size and queue duration cannot be negative");
214 return false;
215 }
216 return true;
217 }
218
219 }