View Javadoc
1   package de.dlr.bt.stc.bridge;
2   
3   import java.io.IOException;
4   import java.util.ArrayList;
5   import java.util.Date;
6   import java.util.HashMap;
7   import java.util.List;
8   import java.util.Map;
9   import java.util.concurrent.ConcurrentLinkedQueue;
10  
11  import org.apache.commons.configuration2.ex.ConfigurationException;
12  import org.greenrobot.eventbus.EventBus;
13  import org.greenrobot.eventbus.Subscribe;
14  import org.greenrobot.eventbus.ThreadMode;
15  
16  import com.api.jsonata4java.expressions.EvaluateException;
17  import com.api.jsonata4java.expressions.Expressions;
18  import com.api.jsonata4java.expressions.ParseException;
19  import com.fasterxml.jackson.databind.JsonNode;
20  import com.fasterxml.jackson.databind.ObjectMapper;
21  import com.fasterxml.jackson.databind.json.JsonMapper;
22  import com.fasterxml.jackson.databind.node.ObjectNode;
23  import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
24  import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
25  
26  import de.dlr.bt.stc.entities.DataPoint;
27  import de.dlr.bt.stc.entities.Mapping;
28  import de.dlr.bt.stc.eventbus.CacheFullEvent;
29  import de.dlr.bt.stc.eventbus.DataAvailableEvent;
30  import de.dlr.bt.stc.exceptions.TemplateException;
31  import de.dlr.bt.stc.task.ITask;
32  import de.dlr.bt.stc.util.DateHelper;
33  import lombok.Getter;
34  import lombok.ToString;
35  import lombok.extern.slf4j.Slf4j;
36  
37  @Slf4j
38  @ToString(onlyExplicitlyIncluded = true)
39  public class Bridge implements ITask {
40  
41  	private final ConcurrentLinkedQueue<DataAvailableEvent> cache = new ConcurrentLinkedQueue<>();
42  	private final DateHelper dateHelper = new DateHelper();
43  	private final ObjectMapper mapper;
44  	private final EventBus eventBus;
45  
46  	@ToString.Include
47  	@Getter
48  	protected final String key;
49  	@ToString.Include
50  	@Getter
51  	private final String sourceKey;
52  	@ToString.Include
53  	@Getter
54  	private final String sinkKey;
55  
56  	@Getter
57  	private final Integer queueSize;
58  	@Getter
59  	private final Integer queueDuration;
60  	private final Mapping mappingTemplate;
61  	@Getter
62  	private final String valueTemplate;
63  	@Getter
64  	private Date cacheCleared = new Date(0);
65  
66  	public Bridge(String key, BridgeCfg config, EventBus eventBus) {
67  		this.key = key;
68  		this.sourceKey = config.getSourceId();
69  		this.sinkKey = config.getSinkId();
70  		this.queueSize = config.getQueueSize();
71  		this.queueDuration = config.getQueueDuration();
72  		this.mappingTemplate = new Mapping(config.getMapping().getMeasurement(), config.getMapping().getLocation(),
73  				config.getMapping().getDevice(), config.getMapping().getSymbolicName(), config.getMapping().getField());
74  		this.valueTemplate = config.getValueTemplate();
75  		this.eventBus = eventBus;
76  
77  		mapper = JsonMapper.builder().addModule(new Jdk8Module()).addModule(new JavaTimeModule()).build();
78  	}
79  
80  	@Override
81  	public void initializeTask() throws ConfigurationException {
82  		log.debug("Initializing bridge {}", key);
83  		if (!validate()) {
84  			throw new ConfigurationException(String.format("Bridge %s is misconfigured", key));
85  		}
86  	}
87  
88  	@Override
89  	public void startTask() {
90  		log.debug("Starting bridge {}", key);
91  		eventBus.register(this);
92  	}
93  
94  	@Override
95  	public void stopTask() {
96  		log.debug("Stopping bridge {}", key);
97  		eventBus.unregister(this);
98  		clearCache();
99  	}
100 
101 	@Subscribe(threadMode = ThreadMode.MAIN)
102 	public void onDataAvailableEvent(DataAvailableEvent event) {
103 		if (sourceKey.equals(event.getSourceKey())) {
104 			log.debug("Received DataAvailableEvent in bridge {}: {}", key, event.toString());
105 			cache.add(event);
106 		}
107 	}
108 
109 	public boolean cacheFull() {
110 		if (cache.isEmpty())
111 			return false;
112 
113 		if (queueSize > 0 && queueSize <= cache.size())
114 			return true;
115 
116 		var cacheAge = Math.abs(dateHelper.getDate().getTime() - cacheCleared.getTime());
117 		return queueDuration > 0 && queueDuration <= cacheAge;
118 	}
119 
120 	/**
121 	 * Get current size of cache.
122 	 * 
123 	 * This method is rather expensive (O(n)) and does not necessarily yield an
124 	 * exact result due to the concurrent nature of the underlying cache.
125 	 * 
126 	 * @return The approximate size of the cache queue
127 	 */
128 	public int getCurrentCacheSize() {
129 		return cache.size();
130 	}
131 
132 	public void clearCache() {
133 		log.info("Clear cache in bridge {}", key);
134 		cacheCleared = dateHelper.getDate();
135 		Map<Mapping, List<DataPoint>> tempCache = new HashMap<>();
136 
137 		DataAvailableEvent dataAvailableEvent;
138 		while ((dataAvailableEvent = cache.poll()) != null) {
139 			var mapping = fillTemplate(mappingTemplate, dataAvailableEvent.getVariableMap());
140 			if (!tempCache.containsKey(mapping))
141 				tempCache.put(mapping, new ArrayList<>());
142 			Object value;
143 			try {
144 				value = useValueTemplate(dataAvailableEvent.getValue());
145 			} catch (TemplateException ex) {
146 				log.error("Could not parse the value template for bridge {}, skipping this data point", this);
147 				continue;
148 			}
149 			tempCache.get(mapping).add(new DataPoint(dataAvailableEvent.getTimestamp(), value));
150 		}
151 		for (var entry : tempCache.entrySet()) {
152 			eventBus.post(new CacheFullEvent(sinkKey, entry.getValue(), entry.getKey()));
153 		}
154 
155 	}
156 
157 	private Mapping fillTemplate(Mapping template, Map<String, String> variableMap) {
158 		var measurement = fillTemplate(template.getMeasurement(), variableMap);
159 		var location = fillTemplate(template.getLocation(), variableMap);
160 		var device = fillTemplate(template.getDevice(), variableMap);
161 		var symbolicName = fillTemplate(template.getSymbolicName(), variableMap);
162 		var field = fillTemplate(template.getField(), variableMap);
163 		return new Mapping(measurement, location, device, symbolicName, field);
164 	}
165 
166 	private String fillTemplate(String template, Map<String, String> variableMap) {
167 		var result = new StringBuilder();
168 		var cursor = 0;
169 		while (cursor < template.length()) {
170 			var temp = template.substring(cursor);
171 			int start = temp.indexOf("{{");
172 			int end = temp.indexOf("}}");
173 			if (start > -1 && end > -1) {
174 				if (end > start) {
175 					// Everything is as expected
176 					var variable = temp.substring(start + 2, end).strip();
177 					result.append(temp.substring(0, start));
178 					result.append(variableMap.getOrDefault(variable, "undefined"));
179 					cursor = cursor + end + 2;
180 				} else {
181 					// End <= Start, both found
182 					result.append(temp.substring(0, start));
183 					cursor = cursor + start;
184 				}
185 			} else {
186 				// Only the start or only the end or neither was found
187 				result.append(temp);
188 				cursor = template.length();
189 			}
190 		}
191 		return result.toString();
192 	}
193 
194 	private Object useValueTemplate(Object value) throws TemplateException {
195 		ObjectNode node = mapper.createObjectNode();
196 		node.set("value", mapper.valueToTree(value));
197 
198 		Object newValue;
199 		try {
200 			JsonNode result = Expressions.parse(valueTemplate).evaluate(node);
201 			newValue = mapper.treeToValue(result, Object.class);
202 		} catch (IOException | ParseException | EvaluateException e) {
203 			throw new TemplateException(e);
204 		}
205 		return newValue;
206 	}
207 
208 	private boolean validate() {
209 		if (queueSize.equals(-1) && queueDuration.equals(-1)) {
210 			log.error("Either the queue size or the queue duration must be defined");
211 			return false;
212 		} else if (queueSize < -1 || queueDuration < -1) {
213 			log.error("Queue size and queue duration cannot be negative");
214 			return false;
215 		}
216 		return true;
217 	}
218 
219 }