MiloSourceNode.java

package de.dlr.bt.stc.source.opcua.uas;

import java.util.Optional;

import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.server.nodes.UaObjectNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaObjectTypeNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
import org.eclipse.milo.opcua.sdk.server.nodes.filters.AttributeFilters;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;

import de.dlr.bt.stc.init.Register;
import de.dlr.bt.stc.opcuaserver.ANodeCreator;
import de.dlr.bt.stc.opcuaserver.NodeFactory;
import de.dlr.bt.stc.opcuaserver.STCNamespace;
import de.dlr.bt.stc.opcuaserver.STCNamespace.Folders;
import de.dlr.bt.stc.source.opcua.MiloSource;
import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
import de.dlr.bt.stc.source.opcua.SourceOPCUACfg.CaptureMode;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MiloSourceNode extends ANodeCreator {

	@Register
	public static void register() {
		NodeFactory.getInstance().registerCreator(MiloSource.class, MiloSourceNode::new);
	}

	private static final String MILSOURCE_TYPE = "MiloSourceType";
	private static final String VALUE = "Value";
	private static final String LAST_UPDATE = "LastUpdate";
	private static final String SUBSCRIPTION = "Subscription";

	private static final String SUBSCRIPTION_TYPE = "MiloSourceSubscriptionType";
	private static final String STATUS = "Status";
	private static final String REQUESTEDSAMPLINGINTERVAL = "RequestedSamplingInterval";
	private static final String REVISEDSAMPLINGINTERVAL = "RevisedSamplingInterval";
	private static final String REQUESTEDQUEUESIZE = "RequestedQueueSize";
	private static final String REVISEDQUEUESIZE = "RevisedQueueSize";

	private static final String MAPPING = "Mapping";

	private UaObjectTypeNode sourceTypeNode;
	private UaObjectTypeNode subscriptionTypeNode;

	private MiloSourceNode(STCNamespace namespace) {
		super(namespace, "stc", "sources", "milosource");
	}

	@Override
	public void createObjectType() {
		try {
			var status = namespace.createObjectTypeComponent(STATUS, namespace.newNodeId(nodePathType(STATUS)),
					Identifiers.String);
			var requestedsampling = namespace.createObjectTypeComponent(REQUESTEDSAMPLINGINTERVAL,
					namespace.newNodeId(nodePathType(REQUESTEDSAMPLINGINTERVAL)), Identifiers.Double);
			var revisedsampling = namespace.createObjectTypeComponent(REVISEDSAMPLINGINTERVAL,
					namespace.newNodeId(nodePathType(REVISEDSAMPLINGINTERVAL)), Identifiers.Double);
			var requestedqueue = namespace.createObjectTypeComponent(REQUESTEDQUEUESIZE,
					namespace.newNodeId(nodePathType(REQUESTEDQUEUESIZE)), Identifiers.UInteger);
			var revisedqueue = namespace.createObjectTypeComponent(REVISEDQUEUESIZE,
					namespace.newNodeId(nodePathType(REVISEDQUEUESIZE)), Identifiers.UInteger);

			subscriptionTypeNode = namespace.createObjectTypeNode(SUBSCRIPTION_TYPE,
					namespace.newNodeId(SUBSCRIPTION_TYPE), status, requestedsampling, revisedsampling, requestedqueue,
					revisedqueue);

			var value = namespace.createObjectTypeComponent(VALUE, namespace.newNodeId(nodePathType(VALUE)),
					NULLNODEID);
			var lastupdate = namespace.createObjectTypeComponent(LAST_UPDATE,
					namespace.newNodeId(nodePathType(LAST_UPDATE)), Identifiers.DateTime);
			var subscription = namespace.createObjectTypeComponent(SUBSCRIPTION,
					namespace.newNodeId(nodePathType(SUBSCRIPTION)), subscriptionTypeNode,
					Identifiers.ModellingRule_Optional);

			sourceTypeNode = namespace.createObjectTypeNode(MILSOURCE_TYPE,
					namespace.newNodeId(nodePathType(MILSOURCE_TYPE)), value, lastupdate, subscription);
		} catch (UaException uae) {
			log.info("Could not create ObjectTypes {}", uae);
		}

	}

	@Override
	public void createInstance(Object forNode, Folders folders) {
		if (!(forNode instanceof MiloSource miloSource))
			return;

		var id = miloSource.getConfig().getId();

		var sfldr = namespace.createFolderNode(id, namespace.newNodeId(nodePathInst(id)),
				folders.getSourceFolder().getNodeId());

		for (var entry : miloSource.getNodes().entrySet()) {
			var nodeidstr = entry.getKey().toParseableString();
			try {

				var node = namespace.createObjectNode(sourceTypeNode, nodeidstr,
						namespace.newNodeId(nodePathInst(id, nodeidstr)), sfldr);

				// Mapping

				if (!entry.getValue().getVarMap().isEmpty()) {

					var mappingFolder = namespace.createFolderNode(MAPPING,
							namespace.newNodeId(nodePathInst(id, nodeidstr, MAPPING)), node.getNodeId());

					for (var mapping : entry.getValue().getVarMap().entrySet()) {
						var mnode = namespace.createVariableNode(mapping.getKey(),
								namespace.newNodeId(nodePathInst(id, nodeidstr, MAPPING, mapping.getKey())),
								Identifiers.String);
						mnode.setValue(new DataValue(new Variant(mapping.getValue())));

						mappingFolder.addOrganizes(mnode);
					}
				}

				// Current value

				var valueNode = namespace.getObjectNodeComponent(node, VALUE);

				valueNode.getFilterChain()
						.addLast(AttributeFilters.getValue(ctx -> getLastValue(entry.getValue(), valueNode)));

				// Last value update

				var lastUpdateNode = namespace.getObjectNodeComponent(node, LAST_UPDATE);

				lastUpdateNode.getFilterChain().addLast(AttributeFilters.getValue(ctx -> {
					DateTime lastUpdate = new DateTime(entry.getValue().getLastUpdate());
					return new DataValue(new Variant(lastUpdate));
				}));

				// Subscription information

				if (miloSource.getConfig().getCaptureMode() == CaptureMode.SUBSCRIPTION) {

					createSubscriptionNodes(node, miloSource, entry.getKey(), id);

				}
			} catch (UaException uae) {
				log.info("Could not create MiloSource node instance {}: {}", nodeidstr, uae);
			}
		}

		addRootNode(forNode, sfldr);
	}

	private static DataValue getLastValue(NodeItem node, UaVariableNode servernode) {
		if (NULLNODEID.equals(servernode.getDataType())) {
			Optional.ofNullable(node.getLastValue()).flatMap(dv -> dv.getValue().getDataType())
					.flatMap(eni -> eni.toNodeId(null)).ifPresent(servernode::setDataType);
		}
		return node.getLastValue();
	}

	private void createSubscriptionNodes(UaObjectNode nsNode, MiloSource miloSource, NodeId nid, String id)
			throws UaException {
		var subnode = namespace.createObjectNode(subscriptionTypeNode, SUBSCRIPTION,
				namespace.newNodeId(nodePathInst(id, nid.toParseableString(), SUBSCRIPTION)), nsNode);

		SubscriptionHandler sh = new SubscriptionHandler(miloSource, nid);

		namespace.setObjectNodeComponent(subnode, STATUS, ctx -> sh.getStatus());
		namespace.setObjectNodeComponent(subnode, REQUESTEDSAMPLINGINTERVAL, ctx -> sh.getRequestedSamplingInterval());
		namespace.setObjectNodeComponent(subnode, REVISEDSAMPLINGINTERVAL, ctx -> sh.getRevisedSamplingInterval());
		namespace.setObjectNodeComponent(subnode, REQUESTEDQUEUESIZE, ctx -> sh.getRequestedQueueSize());
		namespace.setObjectNodeComponent(subnode, REVISEDQUEUESIZE, ctx -> sh.getRevisedQueuesSize());
	}

	@AllArgsConstructor
	private static class SubscriptionHandler {
		private final MiloSource source;
		private final NodeId id;

		private static final DataValue BAD_RESULT = DataValue.newValue().setStatus(StatusCode.BAD).build();

		private DataValue getStatus() {
			var mi = findMonitoredItem(id);
			if (mi != null)
				return new DataValue(new Variant(mi.getStatusCode().toString()));
			return BAD_RESULT;
		}

		private DataValue getRequestedSamplingInterval() {
			var mi = findMonitoredItem(id);
			if (mi != null)
				return new DataValue(new Variant(mi.getRequestedSamplingInterval()));
			return BAD_RESULT;
		}

		private DataValue getRevisedSamplingInterval() {
			var mi = findMonitoredItem(id);
			if (mi != null)
				return new DataValue(new Variant(mi.getRevisedSamplingInterval()));
			return BAD_RESULT;
		}

		private DataValue getRequestedQueueSize() {
			var mi = findMonitoredItem(id);
			if (mi != null)
				return new DataValue(new Variant(mi.getRequestedQueueSize()));
			return BAD_RESULT;
		}

		private DataValue getRevisedQueuesSize() {
			var mi = findMonitoredItem(id);
			if (mi != null)
				return new DataValue(new Variant(mi.getRevisedQueueSize()));
			return BAD_RESULT;
		}

		private UaMonitoredItem findMonitoredItem(NodeId id) {
			var sr = source.getClient().getSubscriptionResult(id);
			if (sr == null)
				return null;
			for (var mi : sr.getMonitoredItems()) {
				if (mi.getReadValueId().getNodeId().equalTo(id.expanded())) {
					return mi;
				}
			}
			return null;
		}
	}

}