1 package de.dlr.bt.stc.source.opcua;
2
3 import java.time.Instant;
4 import java.time.temporal.ChronoUnit;
5 import java.util.Date;
6 import java.util.Map.Entry;
7 import java.util.Objects;
8 import java.util.concurrent.Executors;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.ScheduledFuture;
11 import java.util.concurrent.TimeUnit;
12
13 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
14 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
15 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
16 import org.greenrobot.eventbus.EventBus;
17
18 import com.google.common.base.Strings;
19
20 import de.dlr.bt.stc.eventbus.DataAvailableEvent;
21 import de.dlr.bt.stc.exceptions.SourceConfigurationException;
22 import de.dlr.bt.stc.exceptions.SourceException;
23 import de.dlr.bt.stc.source.ISource;
24 import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
25 import de.dlr.bt.stc.source.opcua.SourceOPCUACfg.CaptureMode;
26 import lombok.Getter;
27 import lombok.Setter;
28 import lombok.ToString;
29 import lombok.extern.slf4j.Slf4j;
30
31 @ToString(onlyExplicitlyIncluded = true)
32 @Slf4j
33 public class MiloSource implements ISource {
34 private static final int MSEC_TO_NSEC = 1000000;
35
36
37
38
39
40 private static final int DELAYED_SUBSCRIPTION_POLL_INTERVAL = 500;
41
42 @ToString.Include
43 private final SourceOPCUACfg config;
44 @ToString.Include
45 @Getter
46 @Setter
47 private MiloClient client = null;
48
49 private ScheduledFuture<?> pollingTask = null;
50 private ScheduledExecutorService executorService = null;
51
52 private final EventBus eventBus;
53
54 @ToString.Include
55 private NodeItems nodes = new NodeItems();
56
57 @ToString.Include
58 private boolean initialized = false;
59
60 public MiloSource(SourceOPCUACfg config, EventBus eventBus) {
61 this.config = Objects.requireNonNull(config);
62 this.eventBus = eventBus;
63
64 }
65
66 @Override
67 public void initializeTask() throws SourceConfigurationException {
68 if (client == null)
69 throw new SourceConfigurationException("MiloClient has not (yet) been set");
70
71
72 if (!Strings.isNullOrEmpty(config.getNodeId())) {
73 var id = MiloClient.getNodeId(config.getNodeId());
74 try {
75
76 var res = client.readNodeValue(id);
77
78 if (res.getStatusCode() == null || !res.getStatusCode().isGood())
79 log.warn("Initial read of MiloSource node {} did not succeed: {}", config.getNodeId(),
80 res.getStatusCode());
81
82 } catch (SourceException se) {
83 log.warn("Initial read of MiloSource node {} did not succeed, exception: {}", config.getNodeId(), se);
84 }
85 nodes.put(id, new NodeItem());
86 } else if (!Strings.isNullOrEmpty(config.getNodePath())) {
87 nodes.putAll(client.resolveNodePath(config.getNodePath()));
88 } else
89 throw new SourceConfigurationException("Neither NodeId nor NodePath have been configured");
90
91 initialized = true;
92 }
93
94 @Override
95 public void startTask() throws SourceException {
96 if (!initialized)
97 throw new SourceException("Source not initialized!");
98
99 boolean startPollingThread = false;
100
101 if (config.getCaptureMode() == CaptureMode.SUBSCRIPTION) {
102
103 var subParams = new SubscriptionParameters(config.getSamplingInterval(), config.getPublishingInterval(),
104 config.getQueueSize());
105 client.registerNodeSubscription(this, subParams, nodes.keySet());
106
107 if (config.getPollingInterval() != null && config.getPollingInterval() > 0)
108 startPollingThread = true;
109
110 } else {
111 startPollingThread = true;
112 }
113
114 if (startPollingThread) {
115
116 executorService = Executors.newSingleThreadScheduledExecutor();
117 pollingTask = executorService.scheduleAtFixedRate(this::poll, 0, DELAYED_SUBSCRIPTION_POLL_INTERVAL,
118 TimeUnit.MILLISECONDS);
119 }
120
121 }
122
123 @Override
124 public synchronized void stopTask() {
125 if (pollingTask != null) {
126 pollingTask.cancel(false);
127 pollingTask = null;
128 }
129
130 if (executorService != null) {
131 executorService.shutdown();
132 }
133 initialized = false;
134 }
135
136 @Override
137 public synchronized void joinTask() throws InterruptedException {
138 if (executorService != null) {
139 executorService.awaitTermination(5, TimeUnit.SECONDS);
140 executorService = null;
141 }
142 }
143
144 public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
145 var nodeItem = nodes.get(item.getReadValueId().getNodeId());
146 if (nodeItem != null) {
147 nodeItem.setLastValue(value);
148 if (value.getStatusCode() != null && value.getStatusCode().isGood()) {
149
150 eventBus.post(new DataAvailableEvent(config.getId(), value.getValue().getValue(), getSourceTime(value),
151 nodeItem.getVarMap()));
152 nodeItem.setLastUpdate(Instant.now());
153 }
154 }
155
156 }
157
158 private synchronized void poll() {
159 for (var node : nodes.entrySet()) {
160 if (config.getCaptureMode() == CaptureMode.POLLING || (ChronoUnit.MILLIS
161 .between(node.getValue().getLastUpdate(), Instant.now()) >= config.getPollingInterval())) {
162 pollNode(node);
163 node.getValue().setLastUpdate(Instant.now());
164 }
165 }
166
167 }
168
169 private void pollNode(Entry<NodeId, NodeItem> node) {
170 try {
171 var dv = client.readNodeValue(node.getKey());
172 node.getValue().setLastValue(dv);
173 if (dv.getStatusCode() != null && dv.getStatusCode().isGood()) {
174 eventBus.post(new DataAvailableEvent(config.getId(), dv.getValue().getValue(), getSourceTime(dv),
175 node.getValue().getVarMap()));
176 }
177
178 } catch (SourceException se) {
179 log.warn("Could not poll node value.");
180 }
181 }
182
183 private static long getSourceTime(DataValue dv) {
184 var sourceTime = dv.getSourceTime();
185 if (sourceTime != null)
186 return sourceTime.getJavaTime() * MSEC_TO_NSEC;
187
188 var serverTime = dv.getServerTime();
189 if (serverTime != null)
190 return serverTime.getJavaTime() * MSEC_TO_NSEC;
191 return new Date().getTime() * MSEC_TO_NSEC;
192 }
193
194 public SourceOPCUACfg getConfig() {
195 return config;
196 }
197
198 public NodeItems getNodes() {
199 return nodes;
200 }
201 }