Bridge.java
package de.dlr.bt.stc.bridge;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import org.greenrobot.eventbus.ThreadMode;
import com.api.jsonata4java.expressions.EvaluateException;
import com.api.jsonata4java.expressions.Expressions;
import com.api.jsonata4java.expressions.ParseException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import de.dlr.bt.stc.entities.DataPoint;
import de.dlr.bt.stc.entities.Mapping;
import de.dlr.bt.stc.eventbus.CacheFullEvent;
import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.exceptions.TemplateException;
import de.dlr.bt.stc.task.ITask;
import de.dlr.bt.stc.util.DateHelper;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ToString(onlyExplicitlyIncluded = true)
public class Bridge implements ITask {
private final ConcurrentLinkedQueue<DataAvailableEvent> cache = new ConcurrentLinkedQueue<>();
private final DateHelper dateHelper = new DateHelper();
private final ObjectMapper mapper;
private final EventBus eventBus;
@ToString.Include
@Getter
protected final String key;
@ToString.Include
@Getter
private final String sourceKey;
@ToString.Include
@Getter
private final String sinkKey;
@Getter
private final Integer queueSize;
@Getter
private final Integer queueDuration;
private final Mapping mappingTemplate;
@Getter
private final String valueTemplate;
@Getter
private Date cacheCleared = new Date(0);
public Bridge(String key, BridgeCfg config, EventBus eventBus) {
this.key = key;
this.sourceKey = config.getSourceId();
this.sinkKey = config.getSinkId();
this.queueSize = config.getQueueSize();
this.queueDuration = config.getQueueDuration();
this.mappingTemplate = new Mapping(config.getMapping().getMeasurement(), config.getMapping().getLocation(),
config.getMapping().getDevice(), config.getMapping().getSymbolicName(), config.getMapping().getField());
this.valueTemplate = config.getValueTemplate();
this.eventBus = eventBus;
mapper = JsonMapper.builder().addModule(new Jdk8Module()).addModule(new JavaTimeModule()).build();
}
@Override
public void initializeTask() throws ConfigurationException {
log.debug("Initializing bridge {}", key);
if (!validate()) {
throw new ConfigurationException(String.format("Bridge %s is misconfigured", key));
}
}
@Override
public void startTask() {
log.debug("Starting bridge {}", key);
eventBus.register(this);
}
@Override
public void stopTask() {
log.debug("Stopping bridge {}", key);
eventBus.unregister(this);
clearCache();
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onDataAvailableEvent(DataAvailableEvent event) {
if (sourceKey.equals(event.getSourceKey())) {
log.debug("Received DataAvailableEvent in bridge {}: {}", key, event.toString());
cache.add(event);
}
}
public boolean cacheFull() {
if (cache.isEmpty())
return false;
if (queueSize > 0 && queueSize <= cache.size())
return true;
var cacheAge = Math.abs(dateHelper.getDate().getTime() - cacheCleared.getTime());
return queueDuration > 0 && queueDuration <= cacheAge;
}
/**
* Get current size of cache.
*
* This method is rather expensive (O(n)) and does not necessarily yield an
* exact result due to the concurrent nature of the underlying cache.
*
* @return The approximate size of the cache queue
*/
public int getCurrentCacheSize() {
return cache.size();
}
public void clearCache() {
log.info("Clear cache in bridge {}", key);
cacheCleared = dateHelper.getDate();
Map<Mapping, List<DataPoint>> tempCache = new HashMap<>();
DataAvailableEvent dataAvailableEvent;
while ((dataAvailableEvent = cache.poll()) != null) {
var mapping = fillTemplate(mappingTemplate, dataAvailableEvent.getVariableMap());
if (!tempCache.containsKey(mapping))
tempCache.put(mapping, new ArrayList<>());
Object value;
try {
value = useValueTemplate(dataAvailableEvent.getValue());
} catch (TemplateException ex) {
log.error("Could not parse the value template for bridge {}, skipping this data point", this);
continue;
}
tempCache.get(mapping).add(new DataPoint(dataAvailableEvent.getTimestamp(), value));
}
for (var entry : tempCache.entrySet()) {
eventBus.post(new CacheFullEvent(sinkKey, entry.getValue(), entry.getKey()));
}
}
private Mapping fillTemplate(Mapping template, Map<String, String> variableMap) {
var measurement = fillTemplate(template.getMeasurement(), variableMap);
var location = fillTemplate(template.getLocation(), variableMap);
var device = fillTemplate(template.getDevice(), variableMap);
var symbolicName = fillTemplate(template.getSymbolicName(), variableMap);
var field = fillTemplate(template.getField(), variableMap);
return new Mapping(measurement, location, device, symbolicName, field);
}
private String fillTemplate(String template, Map<String, String> variableMap) {
var result = new StringBuilder();
var cursor = 0;
while (cursor < template.length()) {
var temp = template.substring(cursor);
int start = temp.indexOf("{{");
int end = temp.indexOf("}}");
if (start > -1 && end > -1) {
if (end > start) {
// Everything is as expected
var variable = temp.substring(start + 2, end).strip();
result.append(temp.substring(0, start));
result.append(variableMap.getOrDefault(variable, "undefined"));
cursor = cursor + end + 2;
} else {
// End <= Start, both found
result.append(temp.substring(0, start));
cursor = cursor + start;
}
} else {
// Only the start or only the end or neither was found
result.append(temp);
cursor = template.length();
}
}
return result.toString();
}
private Object useValueTemplate(Object value) throws TemplateException {
ObjectNode node = mapper.createObjectNode();
node.set("value", mapper.valueToTree(value));
Object newValue;
try {
JsonNode result = Expressions.parse(valueTemplate).evaluate(node);
newValue = mapper.treeToValue(result, Object.class);
} catch (IOException | ParseException | EvaluateException e) {
throw new TemplateException(e);
}
return newValue;
}
private boolean validate() {
if (queueSize.equals(-1) && queueDuration.equals(-1)) {
log.error("Either the queue size or the queue duration must be defined");
return false;
} else if (queueSize < -1 || queueDuration < -1) {
log.error("Queue size and queue duration cannot be negative");
return false;
}
return true;
}
}