View Javadoc
1   package de.dlr.bt.stc.source.opcua;
2   
3   import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
4   
5   import java.io.File;
6   import java.net.URI;
7   import java.net.URISyntaxException;
8   import java.util.ArrayList;
9   import java.util.Arrays;
10  import java.util.Collection;
11  import java.util.Collections;
12  import java.util.HashMap;
13  import java.util.HashSet;
14  import java.util.List;
15  import java.util.Map;
16  import java.util.Objects;
17  import java.util.Optional;
18  import java.util.Set;
19  import java.util.concurrent.ExecutionException;
20  
21  import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
22  import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
23  import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
24  import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
25  import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
26  import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem.ValueConsumer;
27  import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
28  import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager.SubscriptionListener;
29  import org.eclipse.milo.opcua.stack.core.AttributeId;
30  import org.eclipse.milo.opcua.stack.core.Identifiers;
31  import org.eclipse.milo.opcua.stack.core.UaException;
32  import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
33  import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
34  import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
35  import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
36  import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
37  import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
38  import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
39  import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
40  import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
41  import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
42  import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
43  import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
44  import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
45  import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
46  import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
47  import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
48  import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
49  import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
50  
51  import com.google.common.base.Strings;
52  
53  import de.dlr.bt.stc.exceptions.SourceConfigurationException;
54  import de.dlr.bt.stc.exceptions.SourceException;
55  import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
56  import lombok.Data;
57  import lombok.ToString;
58  import lombok.Value;
59  import lombok.extern.slf4j.Slf4j;
60  
61  @ToString(onlyExplicitlyIncluded = true)
62  @Slf4j
63  public class MiloClient {
64  	@ToString.Include
65  	private MiloClientCfg config;
66  
67  	private OpcUaClient opcuaclient;
68  
69  	public MiloClient(MiloClientCfg config) throws SourceConfigurationException {
70  		this.config = Objects.requireNonNull(config);
71  
72  		final IdentityProvider ip;
73  		if (config.getCredentials() == null || Strings.isNullOrEmpty(config.getCredentials().getUsername()))
74  			ip = new AnonymousProvider();
75  		else
76  			ip = new UsernameProvider(config.getCredentials().getUsername(), config.getCredentials().getPassword());
77  
78  		final KeyStoreLoader keystoreLoader;
79  		if (config.getKeystorePath() != null && !config.getKeystorePath().isBlank()) {
80  			keystoreLoader = new KeyStoreLoader();
81  			keystoreLoader.load(new File(config.getKeystorePath()), config.isCreateKeystore());
82  		} else {
83  			keystoreLoader = null;
84  		}
85  
86  		try {
87  			var securitypolicy = config.getSecurityPolicy();
88  			var endpointuri = new URI(config.getEndpoint());
89  
90  			opcuaclient = OpcUaClient.create(config.getEndpoint(), endpoint -> {
91  				var exactep = endpoint.stream().filter(ep -> {
92  					try {
93  						var epuri = new URI(ep.getEndpointUrl());
94  
95  						return epuri.getScheme().equals(endpointuri.getScheme())
96  								&& epuri.getAuthority().equalsIgnoreCase(endpointuri.getAuthority());
97  					} catch (URISyntaxException e) {
98  						return false;
99  					}
100 				}).filter(ep -> ep.getSecurityPolicyUri().equals(securitypolicy.getUri())).findFirst();
101 
102 				if (exactep.isPresent())
103 					return exactep;
104 
105 				Optional<EndpointDescription> fep = endpoint.stream().findFirst();
106 				if (fep.isPresent()) {
107 					EndpointDescription mod = EndpointUtil.updateUrl(fep.get(), endpointuri.getHost());
108 					return Optional.of(mod);
109 				} else
110 					return fep;
111 			}, uaconfig -> {
112 				var cfgb = uaconfig.setApplicationName(LocalizedText.english("Shepard Timeseries Connector"))
113 						.setApplicationUri("urn:idms:stc").setIdentityProvider(ip);
114 				if (keystoreLoader != null) {
115 					cfgb.setCertificate(keystoreLoader.getClientCertificate());
116 					cfgb.setKeyPair(keystoreLoader.getClientKeyPair());
117 				}
118 
119 				return cfgb.build();
120 
121 			});
122 
123 			opcuaclient.connect().get();
124 
125 			opcuaclient.getSubscriptionManager().addSubscriptionListener(new SubscriptionListener() {
126 				@Override
127 				public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
128 					log.info("Recreating subscriptions");
129 
130 					for (var entry : subscriptions.entrySet()) {
131 						if (subscription != null && entry.getValue() != null
132 								&& entry.getValue().getSubscriptionResult() != null
133 								&& subscription.equals(entry.getValue().getSubscriptionResult().getSubscription())) {
134 							entry.getValue().setSubscriptionResult(null);
135 						}
136 					}
137 					updateSubscriptions();
138 				}
139 
140 				@Override
141 				public void onSubscriptionWatchdogTimerElapsed(UaSubscription subscription) {
142 					log.info("Subscription timed out, will be recreated automatically if possible");
143 				};
144 			});
145 		} catch (URISyntaxException urise) {
146 			throw new SourceConfigurationException(urise);
147 		} catch (InterruptedException ie) {
148 			Thread.currentThread().interrupt();
149 		} catch (ExecutionException | UaException e) {
150 			throw new SourceConfigurationException(e);
151 		}
152 	}
153 
154 	public static NodeId getNodeId(String nodeIdStr) {
155 		return NodeId.parseOrNull(nodeIdStr);
156 	}
157 
158 	public NodeItems resolveNodePath(String nodePath) throws SourceConfigurationException {
159 
160 		var split = nodePath.split("/");
161 
162 		try {
163 			return browseNodePath(split, Identifiers.RootFolder, Collections.emptyMap());
164 
165 		} catch (InterruptedException ie) {
166 			Thread.currentThread().interrupt();
167 			throw new SourceConfigurationException(ie);
168 		} catch (ExecutionException e) {
169 			throw new SourceConfigurationException(e);
170 		}
171 
172 	}
173 
174 	private NodeItems browseNodePath(String[] nodePath, NodeId browseFolder, Map<String, String> variableMap)
175 			throws InterruptedException, ExecutionException, SourceConfigurationException {
176 		// No element left in node-path, should not happen?
177 		if (nodePath.length < 1)
178 			return new NodeItems();
179 
180 		String currentNodeElement = nodePath[0].trim();
181 
182 		// Depth of the node path is still more than one, only browse for sub-folders
183 		final var isIntermediate = nodePath.length > 1;
184 
185 		UInteger nodeClasses = isIntermediate ? uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue())
186 				: uint(NodeClass.Variable.getValue());
187 
188 		BrowseDescription bd = new BrowseDescription(browseFolder, BrowseDirection.Forward, Identifiers.References,
189 				true, nodeClasses, uint(BrowseResultMask.All.getValue()));
190 
191 		var browseResult = opcuaclient.browse(bd).get();
192 		StatusCode brsc = browseResult.getStatusCode();
193 		if (brsc == null || !brsc.isGood())
194 			throw new SourceConfigurationException(
195 					"Could not browse OPC/UA Server, got StatusCode: " + ((brsc != null) ? brsc.toString() : "null"));
196 
197 		NodeItems foundNodes = new NodeItems();
198 
199 		if (currentNodeElement.startsWith("{{") && currentNodeElement.endsWith("}}")) {
200 			// Use template
201 			var template = currentNodeElement.substring(2, currentNodeElement.length() - 2);
202 			var templateParts = template.split("\\|");
203 			if (templateParts.length != 2)
204 				throw new SourceConfigurationException(
205 						"Template is invalid, need exactly two parts! Template: " + template);
206 
207 			var matchRegex = templateParts[0].trim();
208 			var variableName = templateParts[1].trim();
209 
210 			for (var ref : browseResult.getReferences()) {
211 				var reftext = ref.getDisplayName().getText();
212 				if (reftext != null && reftext.matches(matchRegex)) {
213 					var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
214 					if (nodeId.isPresent()) {
215 						var newVariableMap = new HashMap<>(variableMap);
216 						newVariableMap.put(variableName, reftext);
217 
218 						foundNodes.put(nodeId.get(), new NodeItem(newVariableMap));
219 					}
220 				}
221 			}
222 
223 		} else {
224 			// directly use DisplayName
225 			for (var ref : browseResult.getReferences()) {
226 				String text = ref.getDisplayName().getText();
227 				if (text != null && text.equals(currentNodeElement)) {
228 					var nodeId = ref.getNodeId().toNodeId(opcuaclient.getNamespaceTable());
229 					if (nodeId.isPresent())
230 						foundNodes.put(nodeId.get(), new NodeItem(variableMap));
231 				}
232 			}
233 		}
234 
235 		if (isIntermediate) {
236 			NodeItems resultList = new NodeItems();
237 			for (var nodeItem : foundNodes.entrySet()) {
238 				resultList.putAll(
239 						browseNodePath(getArrayTail(nodePath), nodeItem.getKey(), nodeItem.getValue().getVarMap()));
240 			}
241 			return resultList;
242 		} else {
243 			return foundNodes;
244 		}
245 
246 	}
247 
248 	private static String[] getArrayTail(String[] nodePath) {
249 		if (nodePath.length < 1)
250 			return new String[0];
251 
252 		return Arrays.copyOfRange(nodePath, 1, nodePath.length);
253 	}
254 
255 	private final Map<NodeId, NodeSubscriptionData> subscriptions = new HashMap<>();
256 
257 	public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters, NodeId nodeId) {
258 		subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
259 	}
260 
261 	public void registerNodeSubscription(MiloSource source, SubscriptionParameters parameters,
262 			Collection<NodeId> nodeIds) {
263 		for (var nodeId : nodeIds)
264 			subscriptions.put(nodeId, new NodeSubscriptionData(source, parameters));
265 	}
266 
267 	private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
268 		var src = subscriptions.get(item.getReadValueId().getNodeId());
269 		if (src != null && src.getSource() != null)
270 			src.getSource().onSubscriptionValue(item, value);
271 	}
272 
273 	public void updateSubscriptions() {
274 		final Map<SubscriptionParameters, Collection<NodeId>> parameters = new HashMap<>();
275 
276 		for (var entry : subscriptions.entrySet()) {
277 			if (entry.getValue().getSubscriptionResult() == null) {
278 				var nodeIds = parameters.computeIfAbsent(entry.getValue().getSubscriptionParameters(),
279 						pararm -> new ArrayList<>());
280 				nodeIds.add(entry.getKey());
281 			}
282 		}
283 
284 		for (var entry : parameters.entrySet()) {
285 			try {
286 				var sr = subscribeNode(entry.getValue(), entry.getKey(), this::onSubscriptionValue);
287 
288 				for (var nodeid : entry.getValue())
289 					subscriptions.get(nodeid).setSubscriptionResult(sr);
290 
291 			} catch (SourceException ex) {
292 				log.warn("Could not create subscription {}", ex);
293 			}
294 		}
295 
296 	}
297 
298 	public SubscriptionResult getSubscriptionResult(NodeId nodeId) {
299 		var nsd = subscriptions.get(nodeId);
300 		if (nsd != null)
301 			return nsd.getSubscriptionResult();
302 		return null;
303 	}
304 
305 	private SubscriptionResult subscribeNode(Collection<NodeId> nodes, SubscriptionParameters cfg,
306 			ValueConsumer consumer) throws SourceException {
307 		try {
308 			var subscription = opcuaclient.getSubscriptionManager().createSubscription(cfg.getPublishingInterval())
309 					.get();
310 
311 			List<MonitoredItemCreateRequest> monitoredItemCreateRequests = new ArrayList<>();
312 			for (var nodeId : nodes) {
313 				UInteger clientHandle = subscription.nextClientHandle();
314 
315 				MonitoringParameters parameters = new MonitoringParameters(clientHandle, cfg.getSamplingInterval(),
316 						null, uint(cfg.getQueueSize()), true);
317 				var readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
318 				var monitoredItemCreateRequest = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
319 						parameters);
320 				monitoredItemCreateRequests.add(monitoredItemCreateRequest);
321 			}
322 
323 			UaSubscription.ItemCreationCallback onItemCreated = (item, id) -> item.setValueConsumer(consumer);
324 
325 			List<UaMonitoredItem> items = subscription
326 					.createMonitoredItems(TimestampsToReturn.Both, monitoredItemCreateRequests, onItemCreated).get();
327 
328 			return new SubscriptionResult(subscription, items);
329 
330 		} catch (ExecutionException ex) {
331 			throw new SourceException(ex);
332 		} catch (InterruptedException ex) {
333 			Thread.currentThread().interrupt();
334 			throw new SourceException(ex);
335 		}
336 	}
337 
338 	public void removeSubscriptions() {
339 		Set<SubscriptionResult> results = new HashSet<>();
340 		for (var nsd : subscriptions.values()) {
341 			if (nsd.getSubscriptionResult() != null)
342 				results.add(nsd.getSubscriptionResult());
343 			nsd.setSubscriptionResult(null);
344 		}
345 
346 		for (var result : results)
347 			removeSubscription(result);
348 
349 	}
350 
351 	private void removeSubscription(SubscriptionResult sr) {
352 		sr.getSubscription().deleteMonitoredItems(sr.getMonitoredItems());
353 
354 		opcuaclient.getSubscriptionManager().deleteSubscription(sr.getSubscription().getSubscriptionId());
355 	}
356 
357 	@Value
358 	public static class SubscriptionResult {
359 		private UaSubscription subscription;
360 		private List<UaMonitoredItem> monitoredItems;
361 	}
362 
363 	public StatusCode writeNodeValue(NodeId node, Object value) {
364 		try {
365 			DataValue dv = new DataValue(new Variant(value), null, null);
366 
367 			return opcuaclient.writeValue(node, dv).get();
368 		} catch (ExecutionException e) {
369 			return StatusCode.BAD;
370 		} catch (InterruptedException ie) {
371 			Thread.currentThread().interrupt();
372 			return StatusCode.BAD;
373 		}
374 	}
375 
376 	public DataValue readNodeValue(NodeId node) throws SourceException {
377 		try {
378 			return opcuaclient.readValue(0, TimestampsToReturn.Both, node).get();
379 		} catch (InterruptedException e) {
380 			Thread.currentThread().interrupt();
381 			throw new SourceException("Unable to read OPC/UA node", e);
382 		} catch (ExecutionException e) {
383 			throw new SourceException("Unable to read OPC/UA node", e);
384 		}
385 	}
386 
387 	public void cleanup() {
388 		try {
389 			opcuaclient.disconnect().get();
390 		} catch (InterruptedException e) {
391 			Thread.currentThread().interrupt();
392 		} catch (ExecutionException e) {
393 			// do nothing
394 		}
395 
396 	}
397 
398 	@Data
399 	private static class NodeSubscriptionData {
400 		private final MiloSource source;
401 		private final SubscriptionParameters subscriptionParameters;
402 		private SubscriptionResult subscriptionResult = null;
403 	}
404 }