Bridge.java

package de.dlr.bt.stc.bridge;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.commons.configuration2.ex.ConfigurationException;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import org.greenrobot.eventbus.ThreadMode;

import com.api.jsonata4java.expressions.EvaluateException;
import com.api.jsonata4java.expressions.Expressions;
import com.api.jsonata4java.expressions.ParseException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import de.dlr.bt.stc.entities.DataPoint;
import de.dlr.bt.stc.entities.Mapping;
import de.dlr.bt.stc.eventbus.CacheFullEvent;
import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.exceptions.TemplateException;
import de.dlr.bt.stc.task.ITask;
import de.dlr.bt.stc.util.DateHelper;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ToString(onlyExplicitlyIncluded = true)
public class Bridge implements ITask {

	private final ConcurrentLinkedQueue<DataAvailableEvent> cache = new ConcurrentLinkedQueue<>();
	private final DateHelper dateHelper = new DateHelper();
	private final ObjectMapper mapper;
	private final EventBus eventBus;

	@ToString.Include
	@Getter
	protected final String key;
	@ToString.Include
	@Getter
	private final String sourceKey;
	@ToString.Include
	@Getter
	private final String sinkKey;

	@Getter
	private final Integer queueSize;
	@Getter
	private final Integer queueDuration;
	private final Mapping mappingTemplate;
	@Getter
	private final String valueTemplate;
	@Getter
	private Date cacheCleared = new Date(0);

	public Bridge(String key, BridgeCfg config, EventBus eventBus) {
		this.key = key;
		this.sourceKey = config.getSourceId();
		this.sinkKey = config.getSinkId();
		this.queueSize = config.getQueueSize();
		this.queueDuration = config.getQueueDuration();
		this.mappingTemplate = new Mapping(config.getMapping().getMeasurement(), config.getMapping().getLocation(),
				config.getMapping().getDevice(), config.getMapping().getSymbolicName(), config.getMapping().getField());
		this.valueTemplate = config.getValueTemplate();
		this.eventBus = eventBus;

		mapper = JsonMapper.builder().addModule(new Jdk8Module()).addModule(new JavaTimeModule()).build();
	}

	@Override
	public void initializeTask() throws ConfigurationException {
		log.debug("Initializing bridge {}", key);
		if (!validate()) {
			throw new ConfigurationException(String.format("Bridge %s is misconfigured", key));
		}
	}

	@Override
	public void startTask() {
		log.debug("Starting bridge {}", key);
		eventBus.register(this);
	}

	@Override
	public void stopTask() {
		log.debug("Stopping bridge {}", key);
		eventBus.unregister(this);
		clearCache();
	}

	@Subscribe(threadMode = ThreadMode.MAIN)
	public void onDataAvailableEvent(DataAvailableEvent event) {
		if (sourceKey.equals(event.getSourceKey())) {
			log.debug("Received DataAvailableEvent in bridge {}: {}", key, event.toString());
			cache.add(event);
		}
	}

	public boolean cacheFull() {
		if (cache.isEmpty())
			return false;

		if (queueSize > 0 && queueSize <= cache.size())
			return true;

		var cacheAge = Math.abs(dateHelper.getDate().getTime() - cacheCleared.getTime());
		return queueDuration > 0 && queueDuration <= cacheAge;
	}

	/**
	 * Get current size of cache.
	 * 
	 * This method is rather expensive (O(n)) and does not necessarily yield an
	 * exact result due to the concurrent nature of the underlying cache.
	 * 
	 * @return The approximate size of the cache queue
	 */
	public int getCurrentCacheSize() {
		return cache.size();
	}

	public void clearCache() {
		log.info("Clear cache in bridge {}", key);
		cacheCleared = dateHelper.getDate();
		Map<Mapping, List<DataPoint>> tempCache = new HashMap<>();

		DataAvailableEvent dataAvailableEvent;
		while ((dataAvailableEvent = cache.poll()) != null) {
			var mapping = fillTemplate(mappingTemplate, dataAvailableEvent.getVariableMap());
			if (!tempCache.containsKey(mapping))
				tempCache.put(mapping, new ArrayList<>());
			Object value;
			try {
				value = useValueTemplate(dataAvailableEvent.getValue());
			} catch (TemplateException ex) {
				log.error("Could not parse the value template for bridge {}, skipping this data point", this);
				continue;
			}
			tempCache.get(mapping).add(new DataPoint(dataAvailableEvent.getTimestamp(), value));
		}
		for (var entry : tempCache.entrySet()) {
			eventBus.post(new CacheFullEvent(sinkKey, entry.getValue(), entry.getKey()));
		}

	}

	private Mapping fillTemplate(Mapping template, Map<String, String> variableMap) {
		var measurement = fillTemplate(template.getMeasurement(), variableMap);
		var location = fillTemplate(template.getLocation(), variableMap);
		var device = fillTemplate(template.getDevice(), variableMap);
		var symbolicName = fillTemplate(template.getSymbolicName(), variableMap);
		var field = fillTemplate(template.getField(), variableMap);
		return new Mapping(measurement, location, device, symbolicName, field);
	}

	private String fillTemplate(String template, Map<String, String> variableMap) {
		var result = new StringBuilder();
		var cursor = 0;
		while (cursor < template.length()) {
			var temp = template.substring(cursor);
			int start = temp.indexOf("{{");
			int end = temp.indexOf("}}");
			if (start > -1 && end > -1) {
				if (end > start) {
					// Everything is as expected
					var variable = temp.substring(start + 2, end).strip();
					result.append(temp.substring(0, start));
					result.append(variableMap.getOrDefault(variable, "undefined"));
					cursor = cursor + end + 2;
				} else {
					// End <= Start, both found
					result.append(temp.substring(0, start));
					cursor = cursor + start;
				}
			} else {
				// Only the start or only the end or neither was found
				result.append(temp);
				cursor = template.length();
			}
		}
		return result.toString();
	}

	private Object useValueTemplate(Object value) throws TemplateException {
		ObjectNode node = mapper.createObjectNode();
		node.set("value", mapper.valueToTree(value));

		Object newValue;
		try {
			JsonNode result = Expressions.parse(valueTemplate).evaluate(node);
			newValue = mapper.treeToValue(result, Object.class);
		} catch (IOException | ParseException | EvaluateException e) {
			throw new TemplateException(e);
		}
		return newValue;
	}

	private boolean validate() {
		if (queueSize.equals(-1) && queueDuration.equals(-1)) {
			log.error("Either the queue size or the queue duration must be defined");
			return false;
		} else if (queueSize < -1 || queueDuration < -1) {
			log.error("Queue size and queue duration cannot be negative");
			return false;
		}
		return true;
	}

}