MiloClient.java

package de.dlr.bt.stc.source.opcua;

import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem.ValueConsumer;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager.SubscriptionListener;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;

import com.google.common.base.Strings;

import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.exceptions.SourceException;
import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
import lombok.Data;
import lombok.ToString;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@ToString(onlyExplicitlyIncluded = true)
@Slf4j
public class MiloClient {
	@ToString.Include
	private MiloClientCfg config;

	private OpcUaClient opcuaclient;

	public MiloClient(MiloClientCfg config) throws SourceConfigurationException {
		this.config = Objects.requireNonNull(config);

		final IdentityProvider ip;
		if (config.getCredentials() == null || Strings.isNullOrEmpty(config.getCredentials().getUsername()))
			ip = new AnonymousProvider();
		else
			ip = new UsernameProvider(config.getCredentials().getUsername(), config.getCredentials().getPassword());

		final KeyStoreLoader keystoreLoader;
		if (config.getKeystorePath() != null && !config.getKeystorePath().isBlank()) {
			keystoreLoader = new KeyStoreLoader();
			keystoreLoader.load(new File(config.getKeystorePath()), config.isCreateKeystore());
		} else {
			keystoreLoader = null;
		}

		try {
			var securitypolicy = config.getSecurityPolicy();
			var endpointuri = new URI(config.getEndpoint());

			opcuaclient = OpcUaClient.create(config.getEndpoint(), endpoint -> {
				var exactep = endpoint.stream().filter(ep -> {
					try {
						var epuri = new URI(ep.getEndpointUrl());

						return epuri.getScheme().equals(endpointuri.getScheme())
								&& epuri.getAuthority().equalsIgnoreCase(endpointuri.getAuthority());
					} catch (URISyntaxException e) {
						return false;
					}
				}).filter(ep -> ep.getSecurityPolicyUri().equals(securitypolicy.getUri())).findFirst();

				if (exactep.isPresent())
					return exactep;

				Optional<EndpointDescription> fep = endpoint.stream().findFirst();
				if (fep.isPresent()) {
					EndpointDescription mod = EndpointUtil.updateUrl(fep.get(), endpointuri.getHost());
					return Optional.of(mod);
				} else
					return fep;
			}, uaconfig -> {
				var cfgb = uaconfig.setApplicationName(LocalizedText.english("Shepard Timeseries Connector"))
						.setApplicationUri("urn:idms:stc").setIdentityProvider(ip);
				if (keystoreLoader != null) {
					cfgb.setCertificate(keystoreLoader.getClientCertificate());
					cfgb.setKeyPair(keystoreLoader.getClientKeyPair());
				}

				return cfgb.build();

			});

			opcuaclient.connect().get();

			opcuaclient.getSubscriptionManager().addSubscriptionListener(new SubscriptionListener() {
				@Override
				public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
					log.info("Recreating subscriptions");

					for (var entry : subscriptions.entrySet()) {
						if (subscription != null && entry.getValue() != null
								&& entry.getValue().getSubscriptionResult() != null
								&& subscription.equals(entry.getValue().getSubscriptionResult().getSubscription())) {
							entry.getValue().setSubscriptionResult(null);
						}
					}
					updateSubscriptions();
				}

				@Override
				public void onSubscriptionWatchdogTimerElapsed(UaSubscription subscription) {
					log.info("Subscription timed out, will be recreated automatically if possible");
				};
			});
		} catch (URISyntaxException urise) {
			throw new SourceConfigurationException(urise);
		} catch (InterruptedException ie) {
			Thread.currentThread().interrupt();
		} catch (ExecutionException | UaException e) {
			throw new SourceConfigurationException(e);
		}
	}

	public static NodeId getNodeId(String nodeIdStr) {
		return NodeId.parseOrNull(nodeIdStr);
	}

	public NodeItems resolveNodePath(String nodePath) throws SourceConfigurationException {

		var split = nodePath.split("/");

		try {
			return browseNodePath(split, Identifiers.RootFolder, Collections.emptyMap());

		} catch (InterruptedException ie) {
			Thread.currentThread().interrupt();
			throw new SourceConfigurationException(ie);
		} catch (ExecutionException e) {
			throw new SourceConfigurationException(e);
		}

	}

	private NodeItems browseNodePath(String[] nodePath, NodeId browseFolder, Map<String, String> variableMap)
			throws InterruptedException, ExecutionException, SourceConfigurationException {
		// No element left in node-path, should not happen?
		if (nodePath.length < 1)
			return new NodeItems();

		String currentNodeElement = nodePath[0].trim();

		// Depth of the node path is still more than one, only browse for sub-folders
		final var isIntermediate = nodePath.length > 1;

		UInteger nodeClasses = isIntermediate ? uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue())
				: uint(NodeClass.Variable.getValue());

		BrowseDescription bd = new BrowseDescription(browseFolder, BrowseDirection.Forward, Identifiers.References,
				true, nodeClasses, uint(BrowseResultMask.All.getValue()));

		var browseResult = opcuaclient.browse(bd).get();
		StatusCode brsc = browseResult.getStatusCode();
		if (brsc == null || !brsc.isGood())
			throw new SourceConfigurationException(
					"Could not browse OPC/UA Server, got StatusCode: " + ((brsc != null) ? brsc.toString() : "null"));

		NodeItems foundNodes = new NodeItems();

		if (currentNodeElement.startsWith("{{") && currentNodeElement.endsWith("}}")) {
			// Use template
			var template = currentNodeElement.substring(2, currentNodeElement.length() - 2);
			var templateParts = template.split("\\|");
			if (templateParts.length != 2)
				throw new SourceConfigurationException(
						"Template is invalid, need exactly two parts! Template: " + template);

			var matchRegex = templateParts[0].trim();
			var variableName = templateParts[1].trim();

			for (var ref : browseResult.getReferences()) {
				var reftext = ref.getDisplayName().getText();
				if (reftext != null && reftext.matches(matchRegex)) {
					var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
					if (nodeId.isPresent()) {
						var newVariableMap = new HashMap<>(variableMap);
						newVariableMap.put(variableName, reftext);

						foundNodes.put(nodeId.get(), new NodeItem(newVariableMap));
					}
				}
			}

		} else {
			// directly use DisplayName
			for (var ref : browseResult.getReferences()) {
				String text = ref.getDisplayName().getText();
				if (text != null && text.equals(currentNodeElement)) {
					var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
					if (nodeId.isPresent())
						foundNodes.put(nodeId.get(), new NodeItem(variableMap));
				}
			}
		}

		if (isIntermediate) {
			NodeItems resultList = new NodeItems();
			for (var nodeItem : foundNodes.entrySet()) {
				resultList.putAll(
						browseNodePath(getArrayTail(nodePath), nodeItem.getKey(), nodeItem.getValue().getVarMap()));
			}
			return resultList;
		} else {
			return foundNodes;
		}

	}

	private static String[] getArrayTail(String[] nodePath) {
		if (nodePath.length < 1)
			return new String[0];

		return Arrays.copyOfRange(nodePath, 1, nodePath.length);
	}

	private final Map<NodeId, NodeSubscriptionData> subscriptions = new HashMap<>();

	public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters, NodeId nodeId) {
		subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
	}

	public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters,
			Collection<NodeId> nodeIds) {
		for (var nodeId : nodeIds)
			subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
	}

	private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
		var src = subscriptions.get(item.getReadValueId().getNodeId());
		if (src != null && src.getSource() != null)
			src.getSource().onSubscriptionValue(item, value);
	}

	public void updateSubscriptions() {
		final Map<SubscriptionParameters, Collection<NodeId>> parameters = new HashMap<>();

		for (var entry : subscriptions.entrySet()) {
			if (entry.getValue().getSubscriptionResult() == null) {
				var nodeIds = parameters.computeIfAbsent(entry.getValue().getSubscriptionParameters(),
						pararm -> new ArrayList<>());
				nodeIds.add(entry.getKey());
			}
		}

		for (var entry : parameters.entrySet()) {
			try {
				var sr = subscribeNode(entry.getValue(), entry.getKey(), this::onSubscriptionValue);

				for (var nodeid : entry.getValue())
					subscriptions.get(nodeid).setSubscriptionResult(sr);

			} catch (SourceException ex) {
				log.warn("Could not create subscription {}", ex);
			}
		}

	}

	public SubscriptionResult getSubscriptionResult(NodeId nodeId) {
		var nsd = subscriptions.get(nodeId);
		if (nsd != null)
			return nsd.getSubscriptionResult();
		return null;
	}

	private SubscriptionResult subscribeNode(Collection<NodeId> nodes, SubscriptionParameters cfg,
			ValueConsumer consumer) throws SourceException {
		try {
			var subscription = opcuaclient.getSubscriptionManager().createSubscription(cfg.getPublishingInterval())
					.get();

			List<MonitoredItemCreateRequest> monitoredItemCreateRequests = new ArrayList<>();
			for (var nodeId : nodes) {
				UInteger clientHandle = subscription.nextClientHandle();

				MonitoringParameters parameters = new MonitoringParameters(clientHandle, cfg.getSamplingInterval(),
						null, uint(cfg.getQueueSize()), true);
				var readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
				var monitoredItemCreateRequest = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
						parameters);
				monitoredItemCreateRequests.add(monitoredItemCreateRequest);
			}

			UaSubscription.ItemCreationCallback onItemCreated = (item, id) -> item.setValueConsumer(consumer);

			List<UaMonitoredItem> items = subscription
					.createMonitoredItems(TimestampsToReturn.Both, monitoredItemCreateRequests, onItemCreated).get();

			return new SubscriptionResult(subscription, items);

		} catch (ExecutionException ex) {
			throw new SourceException(ex);
		} catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			throw new SourceException(ex);
		}
	}

	public void removeSubscriptions() {
		Set<SubscriptionResult> results = new HashSet<>();
		for (var nsd : subscriptions.values()) {
			if (nsd.getSubscriptionResult() != null)
				results.add(nsd.getSubscriptionResult());
			nsd.setSubscriptionResult(null);
		}

		for (var result : results)
			removeSubscription(result);

	}

	private void removeSubscription(SubscriptionResult sr) {
		sr.getSubscription().deleteMonitoredItems(sr.getMonitoredItems());

		opcuaclient.getSubscriptionManager().deleteSubscription(sr.getSubscription().getSubscriptionId());
	}

	@Value
	public static class SubscriptionResult {
		private UaSubscription subscription;
		private List<UaMonitoredItem> monitoredItems;
	}

	public StatusCode writeNodeValue(NodeId node, Object value) {
		try {
			DataValue dv = new DataValue(new Variant(value), null, null);

			return opcuaclient.writeValue(node, dv).get();
		} catch (ExecutionException e) {
			return StatusCode.BAD;
		} catch (InterruptedException ie) {
			Thread.currentThread().interrupt();
			return StatusCode.BAD;
		}
	}

	public DataValue readNodeValue(NodeId node) throws SourceException {
		try {
			return opcuaclient.readValue(0, TimestampsToReturn.Both, node).get();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new SourceException("Unable to read OPC/UA node", e);
		} catch (ExecutionException e) {
			throw new SourceException("Unable to read OPC/UA node", e);
		}
	}

	public void cleanup() {
		try {
			opcuaclient.disconnect().get();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		} catch (ExecutionException e) {
			// do nothing
		}

	}

	@Data
	private static class NodeSubscriptionData {
		private final MiloSource source;
		private final SubscriptionParameters subscriptionParameters;
		private SubscriptionResult subscriptionResult = null;
	}
}