MiloSource.java

package de.dlr.bt.stc.source.opcua;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.greenrobot.eventbus.EventBus;

import com.google.common.base.Strings;

import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.exceptions.SourceException;
import de.dlr.bt.stc.source.ISource;
import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
import de.dlr.bt.stc.source.opcua.SourceOPCUACfg.CaptureMode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@ToString(onlyExplicitlyIncluded = true)
@Slf4j
public class MiloSource implements ISource {
	private static final int MSEC_TO_NSEC = 1000000;
	/**
	 * Interval in milliseconds after which all subscriptions are analyzed if a
	 * value must be actively polled if polling interval is configured for
	 * subscriptions
	 */
	private static final int DELAYED_SUBSCRIPTION_POLL_INTERVAL = 500;

	@ToString.Include
	private final SourceOPCUACfg config;
	@ToString.Include
	@Getter
	@Setter
	private MiloClient client = null;

	private ScheduledFuture<?> pollingTask = null;
	private ScheduledExecutorService executorService = null;

	private final EventBus eventBus;

	@ToString.Include
	private NodeItems nodes = new NodeItems();

	@ToString.Include
	private boolean initialized = false;

	public MiloSource(SourceOPCUACfg config, EventBus eventBus) {
		this.config = Objects.requireNonNull(config);
		this.eventBus = eventBus;

	}

	@Override
	public void initializeTask() throws SourceConfigurationException {
		if (client == null)
			throw new SourceConfigurationException("MiloClient has not (yet) been set");

		// Check whether a single node id or a path has been selected
		if (!Strings.isNullOrEmpty(config.getNodeId())) {
			var id = MiloClient.getNodeId(config.getNodeId());
			try {

				var res = client.readNodeValue(id);

				if (res.getStatusCode() == null || !res.getStatusCode().isGood())
					log.warn("Initial read of MiloSource node {} did not succeed: {}", config.getNodeId(),
							res.getStatusCode());

			} catch (SourceException se) {
				log.warn("Initial read of MiloSource node {} did not succeed, exception: {}", config.getNodeId(), se);
			}
			nodes.put(id, new NodeItem());
		} else if (!Strings.isNullOrEmpty(config.getNodePath())) {
			nodes.putAll(client.resolveNodePath(config.getNodePath()));
		} else
			throw new SourceConfigurationException("Neither NodeId nor NodePath have been configured");

		initialized = true;
	}

	@Override
	public void startTask() throws SourceException {
		if (!initialized)
			throw new SourceException("Source not initialized!");

		boolean startPollingThread = false;

		if (config.getCaptureMode() == CaptureMode.SUBSCRIPTION) {
			// register subscriptions
			var subParams = new SubscriptionParameters(config.getSamplingInterval(), config.getPublishingInterval(),
					config.getQueueSize());
			client.registerNodeSubscription(this, subParams, nodes.keySet());

			if (config.getPollingInterval() != null && config.getPollingInterval() > 0)
				startPollingThread = true;

		} else {
			startPollingThread = true;
		}

		if (startPollingThread) {
			// Start polling thread
			executorService = Executors.newSingleThreadScheduledExecutor();
			pollingTask = executorService.scheduleAtFixedRate(this::poll, 0, DELAYED_SUBSCRIPTION_POLL_INTERVAL,
					TimeUnit.MILLISECONDS);
		}

	}

	@Override
	public synchronized void stopTask() {
		if (pollingTask != null) {
			pollingTask.cancel(false);
			pollingTask = null;
		}

		if (executorService != null) {
			executorService.shutdown();
		}
		initialized = false;
	}

	@Override
	public synchronized void joinTask() throws InterruptedException {
		if (executorService != null) {
			executorService.awaitTermination(5, TimeUnit.SECONDS);
			executorService = null;
		}
	}

	public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
		var nodeItem = nodes.get(item.getReadValueId().getNodeId());
		if (nodeItem != null) {
			nodeItem.setLastValue(value);
			if (value.getStatusCode() != null && value.getStatusCode().isGood()) {

				eventBus.post(new DataAvailableEvent(config.getId(), value.getValue().getValue(), getSourceTime(value),
						nodeItem.getVarMap()));
				nodeItem.setLastUpdate(Instant.now());
			}
		}

	}

	private synchronized void poll() {
		for (var node : nodes.entrySet()) {
			if (config.getCaptureMode() == CaptureMode.POLLING || (ChronoUnit.MILLIS
					.between(node.getValue().getLastUpdate(), Instant.now()) >= config.getPollingInterval())) {
				pollNode(node);
				node.getValue().setLastUpdate(Instant.now());
			}
		}

	}

	private void pollNode(Entry<NodeId, NodeItem> node) {
		try {
			var dv = client.readNodeValue(node.getKey());
			node.getValue().setLastValue(dv);
			if (dv.getStatusCode() != null && dv.getStatusCode().isGood()) {
				eventBus.post(new DataAvailableEvent(config.getId(), dv.getValue().getValue(), getSourceTime(dv),
						node.getValue().getVarMap()));
			}

		} catch (SourceException se) {
			log.warn("Could not poll node value.");
		}
	}

	private static long getSourceTime(DataValue dv) {
		var sourceTime = dv.getSourceTime();
		if (sourceTime != null)
			return sourceTime.getJavaTime() * MSEC_TO_NSEC;

		var serverTime = dv.getServerTime();
		if (serverTime != null)
			return serverTime.getJavaTime() * MSEC_TO_NSEC;
		return new Date().getTime() * MSEC_TO_NSEC;
	}

	public SourceOPCUACfg getConfig() {
		return config;
	}

	public NodeItems getNodes() {
		return nodes;
	}
}