RSISource.java
package de.dlr.bt.stc.source.rsi;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.greenrobot.eventbus.EventBus;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.exceptions.STCException;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.source.ISource;
import de.dlr.bt.stc.source.rsi.SourceRSICfg.SourceDataType;
import de.dlr.bt.stc.util.DateHelper;
import lombok.Getter;
import lombok.Setter;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RSISource implements ISource {
public static final String VARMAP_NODENAME = "nodename";
@Getter
@Setter
private RSIClient client;
private final EventBus instanceEventBus;
private final XPathExpression pathExpression;
private final DateHelper dateHelper = new DateHelper();
@Getter
private final SourceRSICfg config;
private final Consumer<String> listener;
@Getter
private final Map<String, RSIValue> rsiValues = Collections.synchronizedMap(new HashMap<>());
private final List<Consumer<String>> newVariableListener = new ArrayList<>();
public RSISource(SourceRSICfg config, EventBus instanceEventBus) throws SourceConfigurationException {
this.config = config;
this.instanceEventBus = instanceEventBus;
listener = this::receiveMessage;
XPath xPath = XPathFactory.newInstance().newXPath();
try {
this.pathExpression = xPath.compile(config.getPath());
} catch (XPathExpressionException ex) {
throw new SourceConfigurationException(ex);
}
}
@Override
public void startTask() throws STCException {
if (client != null)
client.registerMessageListener(listener);
}
void receiveMessage(String message) {
var timestamp = dateHelper.getDate();
try {
var nodes = (NodeList) pathExpression.evaluate(new InputSource(new StringReader(message)),
XPathConstants.NODESET);
for (int i = 0; i < nodes.getLength(); i++) {
var node = nodes.item(i);
var nodeType = node.getNodeType();
if (nodeType == Node.ATTRIBUTE_NODE || nodeType == Node.ELEMENT_NODE) {
String nodeName = node.getNodeName();
String valueStr = nodeType == Node.ATTRIBUTE_NODE ? node.getNodeValue() : node.getTextContent();
Map<String, String> variableMap = new HashMap<>();
variableMap.put(VARMAP_NODENAME, nodeName);
Object valueObj = convertType(config.getDatatype(), valueStr);
DataAvailableEvent dae = new DataAvailableEvent(config.getId(), valueObj, timestamp.getTime(),
variableMap);
instanceEventBus.post(dae);
var old = rsiValues.put(nodeName, new RSIValue(timestamp, valueObj));
if (old == null) {
newVariableListener.forEach(consumer -> consumer.accept(nodeName));
}
}
}
} catch (XPathExpressionException e) {
log.debug("Failed to evaluate xPath expression: {}", e);
}
}
public void addNewVariableListener(Consumer<String> variableListener) {
newVariableListener.add(variableListener);
}
private static Object convertType(SourceDataType dataType, String value) {
if (dataType == null)
return value;
return switch (dataType) {
case BOOL -> Boolean.valueOf(value);
case FLOAT -> Double.valueOf(value);
case INTEGER -> Long.valueOf(value);
default -> value;
};
}
@Override
public void stopTask() {
if (client != null)
client.removeMessageListener(listener);
}
@Value
public static class RSIValue {
Date timestamp;
Object value;
}
}