RSISource.java

package de.dlr.bt.stc.source.rsi;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;

import org.greenrobot.eventbus.EventBus;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;

import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.exceptions.STCException;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.source.ISource;
import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType;
import de.dlr.bt.stc.util.DateHelper;
import lombok.Getter;
import lombok.Setter;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RSISource implements ISource {

	public static final String VARMAP_NODENAME = "nodename";

	@Getter
	@Setter
	private RSIClient client;

	private final EventBus instanceEventBus;

	private final XPathExpression pathExpression;

	private final DateHelper dateHelper = new DateHelper();

	@Getter
	private final SourceRSICfg config;

	private final Consumer<String> listener;

	@Getter
	private final Map<String, RSIValue> rsiValues = Collections.synchronizedMap(new HashMap<>());

	private final List<Consumer<String>> newVariableListener = new ArrayList<>();

	public RSISource(SourceRSICfg config, EventBus instanceEventBus) throws SourceConfigurationException {
		this.config = config;
		this.instanceEventBus = instanceEventBus;
		listener = this::receiveMessage;

		XPath xPath = XPathFactory.newInstance().newXPath();
		try {
			this.pathExpression = xPath.compile(config.getPath());
		} catch (XPathExpressionException ex) {
			throw new SourceConfigurationException(ex);
		}
	}

	@Override
	public void startTask() throws STCException {
		if (client != null)
			client.registerMessageListener(listener);

	}

	void receiveMessage(String message) {
		var timestamp = dateHelper.getDate();
		try {
			var nodes = (NodeList) pathExpression.evaluate(new InputSource(new StringReader(message)),
					XPathConstants.NODESET);

			for (int i = 0; i < nodes.getLength(); i++) {
				var node = nodes.item(i);

				var nodeType = node.getNodeType();
				if (nodeType == Node.ATTRIBUTE_NODE || nodeType == Node.ELEMENT_NODE) {
					String nodeName = node.getNodeName();
					String valueStr = nodeType == Node.ATTRIBUTE_NODE ? node.getNodeValue() : node.getTextContent();

					Map<String, String> variableMap = new HashMap<>();
					variableMap.put(VARMAP_NODENAME, nodeName);

					Object valueObj = convertType(config.getDatatype(), valueStr);

					DataAvailableEvent dae = new DataAvailableEvent(config.getId(), valueObj, timestamp.getTime(),
							variableMap);
					instanceEventBus.post(dae);

					var old = rsiValues.put(nodeName, new RSIValue(timestamp, valueObj));
					if (old == null) {
						newVariableListener.forEach(consumer -> consumer.accept(nodeName));
					}
				}
			}

		} catch (XPathExpressionException e) {
			log.debug("Failed to evaluate xPath expression: {}", e);
		}
	}

	public void addNewVariableListener(Consumer<String> variableListener) {
		newVariableListener.add(variableListener);
	}

	private static Object convertType(SourceDataType dataType, String value) {
		if (dataType == null)
			return value;

		return switch (dataType) {
		case BOOL -> Boolean.valueOf(value);
		case FLOAT -> Double.valueOf(value);
		case INTEGER -> Long.valueOf(value);
		default -> value;
		};
	}

	@Override
	public void stopTask() {
		if (client != null)
			client.removeMessageListener(listener);
	}

	@Value
	public static class RSIValue {
		Date timestamp;
		Object value;
	}

}