MiloClient.java
package de.dlr.bt.stc.source.opcua;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem.ValueConsumer;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager.SubscriptionListener;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
import com.google.common.base.Strings;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.exceptions.SourceException;
import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
import lombok.Data;
import lombok.ToString;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@ToString(onlyExplicitlyIncluded = true)
@Slf4j
public class MiloClient {
@ToString.Include
private MiloClientCfg config;
private OpcUaClient opcuaclient;
public MiloClient(MiloClientCfg config) throws SourceConfigurationException {
this.config = Objects.requireNonNull(config);
final IdentityProvider ip;
if (config.getCredentials() == null || Strings.isNullOrEmpty(config.getCredentials().getUsername()))
ip = new AnonymousProvider();
else
ip = new UsernameProvider(config.getCredentials().getUsername(), config.getCredentials().getPassword());
final KeyStoreLoader keystoreLoader;
if (config.getKeystorePath() != null && !config.getKeystorePath().isBlank()) {
keystoreLoader = new KeyStoreLoader();
keystoreLoader.load(new File(config.getKeystorePath()), config.isCreateKeystore());
} else {
keystoreLoader = null;
}
try {
var securitypolicy = config.getSecurityPolicy();
var endpointuri = new URI(config.getEndpoint());
opcuaclient = OpcUaClient.create(config.getEndpoint(), endpoint -> {
var exactep = endpoint.stream().filter(ep -> {
try {
var epuri = new URI(ep.getEndpointUrl());
return epuri.getScheme().equals(endpointuri.getScheme())
&& epuri.getAuthority().equalsIgnoreCase(endpointuri.getAuthority());
} catch (URISyntaxException e) {
return false;
}
}).filter(ep -> ep.getSecurityPolicyUri().equals(securitypolicy.getUri())).findFirst();
if (exactep.isPresent())
return exactep;
Optional<EndpointDescription> fep = endpoint.stream().findFirst();
if (fep.isPresent()) {
EndpointDescription mod = EndpointUtil.updateUrl(fep.get(), endpointuri.getHost());
return Optional.of(mod);
} else
return fep;
}, uaconfig -> {
var cfgb = uaconfig.setApplicationName(LocalizedText.english("Shepard Timeseries Connector"))
.setApplicationUri("urn:idms:stc").setIdentityProvider(ip);
if (keystoreLoader != null) {
cfgb.setCertificate(keystoreLoader.getClientCertificate());
cfgb.setKeyPair(keystoreLoader.getClientKeyPair());
}
return cfgb.build();
});
opcuaclient.connect().get();
opcuaclient.getSubscriptionManager().addSubscriptionListener(new SubscriptionListener() {
@Override
public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
log.info("Recreating subscriptions");
for (var entry : subscriptions.entrySet()) {
if (subscription != null && entry.getValue() != null
&& entry.getValue().getSubscriptionResult() != null
&& subscription.equals(entry.getValue().getSubscriptionResult().getSubscription())) {
entry.getValue().setSubscriptionResult(null);
}
}
updateSubscriptions();
}
@Override
public void onSubscriptionWatchdogTimerElapsed(UaSubscription subscription) {
log.info("Subscription timed out, will be recreated automatically if possible");
};
});
} catch (URISyntaxException urise) {
throw new SourceConfigurationException(urise);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (ExecutionException | UaException e) {
throw new SourceConfigurationException(e);
}
}
public static NodeId getNodeId(String nodeIdStr) {
return NodeId.parseOrNull(nodeIdStr);
}
public NodeItems resolveNodePath(String nodePath) throws SourceConfigurationException {
var split = nodePath.split("/");
try {
return browseNodePath(split, Identifiers.RootFolder, Collections.emptyMap());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SourceConfigurationException(ie);
} catch (ExecutionException e) {
throw new SourceConfigurationException(e);
}
}
private NodeItems browseNodePath(String[] nodePath, NodeId browseFolder, Map<String, String> variableMap)
throws InterruptedException, ExecutionException, SourceConfigurationException {
// No element left in node-path, should not happen?
if (nodePath.length < 1)
return new NodeItems();
String currentNodeElement = nodePath[0].trim();
// Depth of the node path is still more than one, only browse for sub-folders
final var isIntermediate = nodePath.length > 1;
UInteger nodeClasses = isIntermediate ? uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue())
: uint(NodeClass.Variable.getValue());
BrowseDescription bd = new BrowseDescription(browseFolder, BrowseDirection.Forward, Identifiers.References,
true, nodeClasses, uint(BrowseResultMask.All.getValue()));
var browseResult = opcuaclient.browse(bd).get();
StatusCode brsc = browseResult.getStatusCode();
if (brsc == null || !brsc.isGood())
throw new SourceConfigurationException(
"Could not browse OPC/UA Server, got StatusCode: " + ((brsc != null) ? brsc.toString() : "null"));
NodeItems foundNodes = new NodeItems();
if (currentNodeElement.startsWith("{{") && currentNodeElement.endsWith("}}")) {
// Use template
var template = currentNodeElement.substring(2, currentNodeElement.length() - 2);
var templateParts = template.split("\\|");
if (templateParts.length != 2)
throw new SourceConfigurationException(
"Template is invalid, need exactly two parts! Template: " + template);
var matchRegex = templateParts[0].trim();
var variableName = templateParts[1].trim();
for (var ref : browseResult.getReferences()) {
var reftext = ref.getDisplayName().getText();
if (reftext != null && reftext.matches(matchRegex)) {
var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
if (nodeId.isPresent()) {
var newVariableMap = new HashMap<>(variableMap);
newVariableMap.put(variableName, reftext);
foundNodes.put(nodeId.get(), new NodeItem(newVariableMap));
}
}
}
} else {
// directly use DisplayName
for (var ref : browseResult.getReferences()) {
String text = ref.getDisplayName().getText();
if (text != null && text.equals(currentNodeElement)) {
var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
if (nodeId.isPresent())
foundNodes.put(nodeId.get(), new NodeItem(variableMap));
}
}
}
if (isIntermediate) {
NodeItems resultList = new NodeItems();
for (var nodeItem : foundNodes.entrySet()) {
resultList.putAll(
browseNodePath(getArrayTail(nodePath), nodeItem.getKey(), nodeItem.getValue().getVarMap()));
}
return resultList;
} else {
return foundNodes;
}
}
private static String[] getArrayTail(String[] nodePath) {
if (nodePath.length < 1)
return new String[0];
return Arrays.copyOfRange(nodePath, 1, nodePath.length);
}
private final Map<NodeId, NodeSubscriptionData> subscriptions = new HashMap<>();
public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters, NodeId nodeId) {
subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
}
public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters,
Collection<NodeId> nodeIds) {
for (var nodeId : nodeIds)
subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
}
private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
var src = subscriptions.get(item.getReadValueId().getNodeId());
if (src != null && src.getSource() != null)
src.getSource().onSubscriptionValue(item, value);
}
public void updateSubscriptions() {
final Map<SubscriptionParameters, Collection<NodeId>> parameters = new HashMap<>();
for (var entry : subscriptions.entrySet()) {
if (entry.getValue().getSubscriptionResult() == null) {
var nodeIds = parameters.computeIfAbsent(entry.getValue().getSubscriptionParameters(),
pararm -> new ArrayList<>());
nodeIds.add(entry.getKey());
}
}
for (var entry : parameters.entrySet()) {
try {
var sr = subscribeNode(entry.getValue(), entry.getKey(), this::onSubscriptionValue);
for (var nodeid : entry.getValue())
subscriptions.get(nodeid).setSubscriptionResult(sr);
} catch (SourceException ex) {
log.warn("Could not create subscription {}", ex);
}
}
}
public SubscriptionResult getSubscriptionResult(NodeId nodeId) {
var nsd = subscriptions.get(nodeId);
if (nsd != null)
return nsd.getSubscriptionResult();
return null;
}
private SubscriptionResult subscribeNode(Collection<NodeId> nodes, SubscriptionParameters cfg,
ValueConsumer consumer) throws SourceException {
try {
var subscription = opcuaclient.getSubscriptionManager().createSubscription(cfg.getPublishingInterval())
.get();
List<MonitoredItemCreateRequest> monitoredItemCreateRequests = new ArrayList<>();
for (var nodeId : nodes) {
UInteger clientHandle = subscription.nextClientHandle();
MonitoringParameters parameters = new MonitoringParameters(clientHandle, cfg.getSamplingInterval(),
null, uint(cfg.getQueueSize()), true);
var readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
var monitoredItemCreateRequest = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
parameters);
monitoredItemCreateRequests.add(monitoredItemCreateRequest);
}
UaSubscription.ItemCreationCallback onItemCreated = (item, id) -> item.setValueConsumer(consumer);
List<UaMonitoredItem> items = subscription
.createMonitoredItems(TimestampsToReturn.Both, monitoredItemCreateRequests, onItemCreated).get();
return new SubscriptionResult(subscription, items);
} catch (ExecutionException ex) {
throw new SourceException(ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new SourceException(ex);
}
}
public void removeSubscriptions() {
Set<SubscriptionResult> results = new HashSet<>();
for (var nsd : subscriptions.values()) {
if (nsd.getSubscriptionResult() != null)
results.add(nsd.getSubscriptionResult());
nsd.setSubscriptionResult(null);
}
for (var result : results)
removeSubscription(result);
}
private void removeSubscription(SubscriptionResult sr) {
sr.getSubscription().deleteMonitoredItems(sr.getMonitoredItems());
opcuaclient.getSubscriptionManager().deleteSubscription(sr.getSubscription().getSubscriptionId());
}
@Value
public static class SubscriptionResult {
private UaSubscription subscription;
private List<UaMonitoredItem> monitoredItems;
}
public StatusCode writeNodeValue(NodeId node, Object value) {
try {
DataValue dv = new DataValue(new Variant(value), null, null);
return opcuaclient.writeValue(node, dv).get();
} catch (ExecutionException e) {
return StatusCode.BAD;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return StatusCode.BAD;
}
}
public DataValue readNodeValue(NodeId node) throws SourceException {
try {
return opcuaclient.readValue(0, TimestampsToReturn.Both, node).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SourceException("Unable to read OPC/UA node", e);
} catch (ExecutionException e) {
throw new SourceException("Unable to read OPC/UA node", e);
}
}
public void cleanup() {
try {
opcuaclient.disconnect().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// do nothing
}
}
@Data
private static class NodeSubscriptionData {
private final MiloSource source;
private final SubscriptionParameters subscriptionParameters;
private SubscriptionResult subscriptionResult = null;
}
}