1 package de.dlr.bt.stc.source.opcua;
2
3 import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
4
5 import java.io.File;
6 import java.net.URI;
7 import java.net.URISyntaxException;
8 import java.util.ArrayList;
9 import java.util.Arrays;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Objects;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.concurrent.ExecutionException;
20
21 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
22 import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
23 import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
24 import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
25 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
26 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem.ValueConsumer;
27 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
28 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager.SubscriptionListener;
29 import org.eclipse.milo.opcua.stack.core.AttributeId;
30 import org.eclipse.milo.opcua.stack.core.Identifiers;
31 import org.eclipse.milo.opcua.stack.core.UaException;
32 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
33 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
34 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
35 import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
36 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
37 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
38 import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
39 import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
40 import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
41 import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
42 import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
43 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
44 import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
45 import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
46 import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
47 import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
48 import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
49 import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
50
51 import com.google.common.base.Strings;
52
53 import de.dlr.bt.stc.exceptions.SourceConfigurationException;
54 import de.dlr.bt.stc.exceptions.SourceException;
55 import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
56 import lombok.Data;
57 import lombok.ToString;
58 import lombok.Value;
59 import lombok.extern.slf4j.Slf4j;
60
61 @ToString(onlyExplicitlyIncluded = true)
62 @Slf4j
63 public class MiloClient {
64 @ToString.Include
65 private MiloClientCfg config;
66
67 private OpcUaClient opcuaclient;
68
69 public MiloClient(MiloClientCfg config) throws SourceConfigurationException {
70 this.config = Objects.requireNonNull(config);
71
72 final IdentityProvider ip;
73 if (config.getCredentials() == null || Strings.isNullOrEmpty(config.getCredentials().getUsername()))
74 ip = new AnonymousProvider();
75 else
76 ip = new UsernameProvider(config.getCredentials().getUsername(), config.getCredentials().getPassword());
77
78 final KeyStoreLoader keystoreLoader;
79 if (config.getKeystorePath() != null && !config.getKeystorePath().isBlank()) {
80 keystoreLoader = new KeyStoreLoader();
81 keystoreLoader.load(new File(config.getKeystorePath()), config.isCreateKeystore());
82 } else {
83 keystoreLoader = null;
84 }
85
86 try {
87 var securitypolicy = config.getSecurityPolicy();
88 var endpointuri = new URI(config.getEndpoint());
89
90 opcuaclient = OpcUaClient.create(config.getEndpoint(), endpoint -> {
91 var exactep = endpoint.stream().filter(ep -> {
92 try {
93 var epuri = new URI(ep.getEndpointUrl());
94
95 return epuri.getScheme().equals(endpointuri.getScheme())
96 && epuri.getAuthority().equalsIgnoreCase(endpointuri.getAuthority());
97 } catch (URISyntaxException e) {
98 return false;
99 }
100 }).filter(ep -> ep.getSecurityPolicyUri().equals(securitypolicy.getUri())).findFirst();
101
102 if (exactep.isPresent())
103 return exactep;
104
105 Optional<EndpointDescription> fep = endpoint.stream().findFirst();
106 if (fep.isPresent()) {
107 EndpointDescription mod = EndpointUtil.updateUrl(fep.get(), endpointuri.getHost());
108 return Optional.of(mod);
109 } else
110 return fep;
111 }, uaconfig -> {
112 var cfgb = uaconfig.setApplicationName(LocalizedText.english("Shepard Timeseries Connector"))
113 .setApplicationUri("urn:idms:stc").setIdentityProvider(ip);
114 if (keystoreLoader != null) {
115 cfgb.setCertificate(keystoreLoader.getClientCertificate());
116 cfgb.setKeyPair(keystoreLoader.getClientKeyPair());
117 }
118
119 return cfgb.build();
120
121 });
122
123 opcuaclient.connect().get();
124
125 opcuaclient.getSubscriptionManager().addSubscriptionListener(new SubscriptionListener() {
126 @Override
127 public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
128 log.info("Recreating subscriptions");
129
130 for (var entry : subscriptions.entrySet()) {
131 if (subscription != null && entry.getValue() != null
132 && entry.getValue().getSubscriptionResult() != null
133 && subscription.equals(entry.getValue().getSubscriptionResult().getSubscription())) {
134 entry.getValue().setSubscriptionResult(null);
135 }
136 }
137 updateSubscriptions();
138 }
139
140 @Override
141 public void onSubscriptionWatchdogTimerElapsed(UaSubscription subscription) {
142 log.info("Subscription timed out, will be recreated automatically if possible");
143 };
144 });
145 } catch (URISyntaxException urise) {
146 throw new SourceConfigurationException(urise);
147 } catch (InterruptedException ie) {
148 Thread.currentThread().interrupt();
149 } catch (ExecutionException | UaException e) {
150 throw new SourceConfigurationException(e);
151 }
152 }
153
154 public static NodeId getNodeId(String nodeIdStr) {
155 return NodeId.parseOrNull(nodeIdStr);
156 }
157
158 public NodeItems resolveNodePath(String nodePath) throws SourceConfigurationException {
159
160 var split = nodePath.split("/");
161
162 try {
163 return browseNodePath(split, Identifiers.RootFolder, Collections.emptyMap());
164
165 } catch (InterruptedException ie) {
166 Thread.currentThread().interrupt();
167 throw new SourceConfigurationException(ie);
168 } catch (ExecutionException e) {
169 throw new SourceConfigurationException(e);
170 }
171
172 }
173
174 private NodeItems browseNodePath(String[] nodePath, NodeId browseFolder, Map<String, String> variableMap)
175 throws InterruptedException, ExecutionException, SourceConfigurationException {
176
177 if (nodePath.length < 1)
178 return new NodeItems();
179
180 String currentNodeElement = nodePath[0].trim();
181
182
183 final var isIntermediate = nodePath.length > 1;
184
185 UInteger nodeClasses = isIntermediate ? uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue())
186 : uint(NodeClass.Variable.getValue());
187
188 BrowseDescription bd = new BrowseDescription(browseFolder, BrowseDirection.Forward, Identifiers.References,
189 true, nodeClasses, uint(BrowseResultMask.All.getValue()));
190
191 var browseResult = opcuaclient.browse(bd).get();
192 StatusCode brsc = browseResult.getStatusCode();
193 if (brsc == null || !brsc.isGood())
194 throw new SourceConfigurationException(
195 "Could not browse OPC/UA Server, got StatusCode: " + ((brsc != null) ? brsc.toString() : "null"));
196
197 NodeItems foundNodes = new NodeItems();
198
199 if (currentNodeElement.startsWith("{{") && currentNodeElement.endsWith("}}")) {
200
201 var template = currentNodeElement.substring(2, currentNodeElement.length() - 2);
202 var templateParts = template.split("\\|");
203 if (templateParts.length != 2)
204 throw new SourceConfigurationException(
205 "Template is invalid, need exactly two parts! Template: " + template);
206
207 var matchRegex = templateParts[0].trim();
208 var variableName = templateParts[1].trim();
209
210 for (var ref : browseResult.getReferences()) {
211 var reftext = ref.getDisplayName().getText();
212 if (reftext != null && reftext.matches(matchRegex)) {
213 var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
214 if (nodeId.isPresent()) {
215 var newVariableMap = new HashMap<>(variableMap);
216 newVariableMap.put(variableName, reftext);
217
218 foundNodes.put(nodeId.get(), new NodeItem(newVariableMap));
219 }
220 }
221 }
222
223 } else {
224
225 for (var ref : browseResult.getReferences()) {
226 String text = ref.getDisplayName().getText();
227 if (text != null && text.equals(currentNodeElement)) {
228 var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
229 if (nodeId.isPresent())
230 foundNodes.put(nodeId.get(), new NodeItem(variableMap));
231 }
232 }
233 }
234
235 if (isIntermediate) {
236 NodeItems resultList = new NodeItems();
237 for (var nodeItem : foundNodes.entrySet()) {
238 resultList.putAll(
239 browseNodePath(getArrayTail(nodePath), nodeItem.getKey(), nodeItem.getValue().getVarMap()));
240 }
241 return resultList;
242 } else {
243 return foundNodes;
244 }
245
246 }
247
248 private static String[] getArrayTail(String[] nodePath) {
249 if (nodePath.length < 1)
250 return new String[0];
251
252 return Arrays.copyOfRange(nodePath, 1, nodePath.length);
253 }
254
255 private final Map<NodeId, NodeSubscriptionData> subscriptions = new HashMap<>();
256
257 public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters, NodeId nodeId) {
258 subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
259 }
260
261 public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters,
262 Collection<NodeId> nodeIds) {
263 for (var nodeId : nodeIds)
264 subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
265 }
266
267 private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
268 var src = subscriptions.get(item.getReadValueId().getNodeId());
269 if (src != null && src.getSource() != null)
270 src.getSource().onSubscriptionValue(item, value);
271 }
272
273 public void updateSubscriptions() {
274 final Map<SubscriptionParameters, Collection<NodeId>> parameters = new HashMap<>();
275
276 for (var entry : subscriptions.entrySet()) {
277 if (entry.getValue().getSubscriptionResult() == null) {
278 var nodeIds = parameters.computeIfAbsent(entry.getValue().getSubscriptionParameters(),
279 pararm -> new ArrayList<>());
280 nodeIds.add(entry.getKey());
281 }
282 }
283
284 for (var entry : parameters.entrySet()) {
285 try {
286 var sr = subscribeNode(entry.getValue(), entry.getKey(), this::onSubscriptionValue);
287
288 for (var nodeid : entry.getValue())
289 subscriptions.get(nodeid).setSubscriptionResult(sr);
290
291 } catch (SourceException ex) {
292 log.warn("Could not create subscription {}", ex);
293 }
294 }
295
296 }
297
298 public SubscriptionResult getSubscriptionResult(NodeId nodeId) {
299 var nsd = subscriptions.get(nodeId);
300 if (nsd != null)
301 return nsd.getSubscriptionResult();
302 return null;
303 }
304
305 private SubscriptionResult subscribeNode(Collection<NodeId> nodes, SubscriptionParameters cfg,
306 ValueConsumer consumer) throws SourceException {
307 try {
308 var subscription = opcuaclient.getSubscriptionManager().createSubscription(cfg.getPublishingInterval())
309 .get();
310
311 List<MonitoredItemCreateRequest> monitoredItemCreateRequests = new ArrayList<>();
312 for (var nodeId : nodes) {
313 UInteger clientHandle = subscription.nextClientHandle();
314
315 MonitoringParameters parameters = new MonitoringParameters(clientHandle, cfg.getSamplingInterval(),
316 null, uint(cfg.getQueueSize()), true);
317 var readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
318 var monitoredItemCreateRequest = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
319 parameters);
320 monitoredItemCreateRequests.add(monitoredItemCreateRequest);
321 }
322
323 UaSubscription.ItemCreationCallback onItemCreated = (item, id) -> item.setValueConsumer(consumer);
324
325 List<UaMonitoredItem> items = subscription
326 .createMonitoredItems(TimestampsToReturn.Both, monitoredItemCreateRequests, onItemCreated).get();
327
328 return new SubscriptionResult(subscription, items);
329
330 } catch (ExecutionException ex) {
331 throw new SourceException(ex);
332 } catch (InterruptedException ex) {
333 Thread.currentThread().interrupt();
334 throw new SourceException(ex);
335 }
336 }
337
338 public void removeSubscriptions() {
339 Set<SubscriptionResult> results = new HashSet<>();
340 for (var nsd : subscriptions.values()) {
341 if (nsd.getSubscriptionResult() != null)
342 results.add(nsd.getSubscriptionResult());
343 nsd.setSubscriptionResult(null);
344 }
345
346 for (var result : results)
347 removeSubscription(result);
348
349 }
350
351 private void removeSubscription(SubscriptionResult sr) {
352 sr.getSubscription().deleteMonitoredItems(sr.getMonitoredItems());
353
354 opcuaclient.getSubscriptionManager().deleteSubscription(sr.getSubscription().getSubscriptionId());
355 }
356
357 @Value
358 public static class SubscriptionResult {
359 private UaSubscription subscription;
360 private List<UaMonitoredItem> monitoredItems;
361 }
362
363 public StatusCode writeNodeValue(NodeId node, Object value) {
364 try {
365 DataValue dv = new DataValue(new Variant(value), null, null);
366
367 return opcuaclient.writeValue(node, dv).get();
368 } catch (ExecutionException e) {
369 return StatusCode.BAD;
370 } catch (InterruptedException ie) {
371 Thread.currentThread().interrupt();
372 return StatusCode.BAD;
373 }
374 }
375
376 public DataValue readNodeValue(NodeId node) throws SourceException {
377 try {
378 return opcuaclient.readValue(0, TimestampsToReturn.Both, node).get();
379 } catch (InterruptedException e) {
380 Thread.currentThread().interrupt();
381 throw new SourceException("Unable to read OPC/UA node", e);
382 } catch (ExecutionException e) {
383 throw new SourceException("Unable to read OPC/UA node", e);
384 }
385 }
386
387 public void cleanup() {
388 try {
389 opcuaclient.disconnect().get();
390 } catch (InterruptedException e) {
391 Thread.currentThread().interrupt();
392 } catch (ExecutionException e) {
393
394 }
395
396 }
397
398 @Data
399 private static class NodeSubscriptionData {
400 private final MiloSource source;
401 private final SubscriptionParameters subscriptionParameters;
402 private SubscriptionResult subscriptionResult = null;
403 }
404 }