MiloSource.java
package de.dlr.bt.stc.source.opcua;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.greenrobot.eventbus.EventBus;
import com.google.common.base.Strings;
import de.dlr.bt.stc.eventbus.DataAvailableEvent;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import de.dlr.bt.stc.exceptions.SourceException;
import de.dlr.bt.stc.source.ISource;
import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
import de.dlr.bt.stc.source.opcua.SourceOPCUACfg.CaptureMode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@ToString(onlyExplicitlyIncluded = true)
@Slf4j
public class MiloSource implements ISource {
private static final int MSEC_TO_NSEC = 1000000;
/**
* Interval in milliseconds after which all subscriptions are analyzed if a
* value must be actively polled if polling interval is configured for
* subscriptions
*/
private static final int DELAYED_SUBSCRIPTION_POLL_INTERVAL = 500;
@ToString.Include
private final SourceOPCUACfg config;
@ToString.Include
@Getter
@Setter
private MiloClient client = null;
private ScheduledFuture<?> pollingTask = null;
private ScheduledExecutorService executorService = null;
private final EventBus eventBus;
@ToString.Include
private NodeItems nodes = new NodeItems();
@ToString.Include
private boolean initialized = false;
public MiloSource(SourceOPCUACfg config, EventBus eventBus) {
this.config = Objects.requireNonNull(config);
this.eventBus = eventBus;
}
@Override
public void initializeTask() throws SourceConfigurationException {
if (client == null)
throw new SourceConfigurationException("MiloClient has not (yet) been set");
// Check whether a single node id or a path has been selected
if (!Strings.isNullOrEmpty(config.getNodeId())) {
var id = MiloClient.getNodeId(config.getNodeId());
try {
var res = client.readNodeValue(id);
if (res.getStatusCode() == null || !res.getStatusCode().isGood())
log.warn("Initial read of MiloSource node {} did not succeed: {}", config.getNodeId(),
res.getStatusCode());
} catch (SourceException se) {
log.warn("Initial read of MiloSource node {} did not succeed, exception: {}", config.getNodeId(), se);
}
nodes.put(id, new NodeItem());
} else if (!Strings.isNullOrEmpty(config.getNodePath())) {
nodes.putAll(client.resolveNodePath(config.getNodePath()));
} else
throw new SourceConfigurationException("Neither NodeId nor NodePath have been configured");
initialized = true;
}
@Override
public void startTask() throws SourceException {
if (!initialized)
throw new SourceException("Source not initialized!");
boolean startPollingThread = false;
if (config.getCaptureMode() == CaptureMode.SUBSCRIPTION) {
// register subscriptions
var subParams = new SubscriptionParameters(config.getSamplingInterval(), config.getPublishingInterval(),
config.getQueueSize());
client.registerNodeSubscription(this, subParams, nodes.keySet());
if (config.getPollingInterval() != null && config.getPollingInterval() > 0)
startPollingThread = true;
} else {
startPollingThread = true;
}
if (startPollingThread) {
// Start polling thread
executorService = Executors.newSingleThreadScheduledExecutor();
pollingTask = executorService.scheduleAtFixedRate(this::poll, 0, DELAYED_SUBSCRIPTION_POLL_INTERVAL,
TimeUnit.MILLISECONDS);
}
}
@Override
public synchronized void stopTask() {
if (pollingTask != null) {
pollingTask.cancel(false);
pollingTask = null;
}
if (executorService != null) {
executorService.shutdown();
}
initialized = false;
}
@Override
public synchronized void joinTask() throws InterruptedException {
if (executorService != null) {
executorService.awaitTermination(5, TimeUnit.SECONDS);
executorService = null;
}
}
public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
var nodeItem = nodes.get(item.getReadValueId().getNodeId());
if (nodeItem != null) {
nodeItem.setLastValue(value);
if (value.getStatusCode() != null && value.getStatusCode().isGood()) {
eventBus.post(new DataAvailableEvent(config.getId(), value.getValue().getValue(), getSourceTime(value),
nodeItem.getVarMap()));
nodeItem.setLastUpdate(Instant.now());
}
}
}
private synchronized void poll() {
for (var node : nodes.entrySet()) {
if (config.getCaptureMode() == CaptureMode.POLLING || (ChronoUnit.MILLIS
.between(node.getValue().getLastUpdate(), Instant.now()) >= config.getPollingInterval())) {
pollNode(node);
node.getValue().setLastUpdate(Instant.now());
}
}
}
private void pollNode(Entry<NodeId, NodeItem> node) {
try {
var dv = client.readNodeValue(node.getKey());
node.getValue().setLastValue(dv);
if (dv.getStatusCode() != null && dv.getStatusCode().isGood()) {
eventBus.post(new DataAvailableEvent(config.getId(), dv.getValue().getValue(), getSourceTime(dv),
node.getValue().getVarMap()));
}
} catch (SourceException se) {
log.warn("Could not poll node value.");
}
}
private static long getSourceTime(DataValue dv) {
var sourceTime = dv.getSourceTime();
if (sourceTime != null)
return sourceTime.getJavaTime() * MSEC_TO_NSEC;
var serverTime = dv.getServerTime();
if (serverTime != null)
return serverTime.getJavaTime() * MSEC_TO_NSEC;
return new Date().getTime() * MSEC_TO_NSEC;
}
public SourceOPCUACfg getConfig() {
return config;
}
public NodeItems getNodes() {
return nodes;
}
}