View Javadoc
1   package de.dlr.bt.stc.source.opcua;
2   
3   import java.time.Instant;
4   import java.time.temporal.ChronoUnit;
5   import java.util.Date;
6   import java.util.Map.Entry;
7   import java.util.Objects;
8   import java.util.concurrent.Executors;
9   import java.util.concurrent.ScheduledExecutorService;
10  import java.util.concurrent.ScheduledFuture;
11  import java.util.concurrent.TimeUnit;
12  
13  import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
14  import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
15  import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
16  import org.greenrobot.eventbus.EventBus;
17  
18  import com.google.common.base.Strings;
19  
20  import de.dlr.bt.stc.eventbus.DataAvailableEvent;
21  import de.dlr.bt.stc.exceptions.SourceConfigurationException;
22  import de.dlr.bt.stc.exceptions.SourceException;
23  import de.dlr.bt.stc.source.ISource;
24  import de.dlr.bt.stc.source.opcua.NodeItems.NodeItem;
25  import de.dlr.bt.stc.source.opcua.SourceOPCUACfg.CaptureMode;
26  import lombok.Getter;
27  import lombok.Setter;
28  import lombok.ToString;
29  import lombok.extern.slf4j.Slf4j;
30  
31  @ToString(onlyExplicitlyIncluded = true)
32  @Slf4j
33  public class MiloSource implements ISource {
34  	private static final int MSEC_TO_NSEC = 1000000;
35  	/**
36  	 * Interval in milliseconds after which all subscriptions are analyzed if a
37  	 * value must be actively polled if polling interval is configured for
38  	 * subscriptions
39  	 */
40  	private static final int DELAYED_SUBSCRIPTION_POLL_INTERVAL = 500;
41  
42  	@ToString.Include
43  	private final SourceOPCUACfg config;
44  	@ToString.Include
45  	@Getter
46  	@Setter
47  	private MiloClient client = null;
48  
49  	private ScheduledFuture<?> pollingTask = null;
50  	private ScheduledExecutorService executorService = null;
51  
52  	private final EventBus eventBus;
53  
54  	@ToString.Include
55  	private NodeItems nodes = new NodeItems();
56  
57  	@ToString.Include
58  	private boolean initialized = false;
59  
60  	public MiloSource(SourceOPCUACfg config, EventBus eventBus) {
61  		this.config = Objects.requireNonNull(config);
62  		this.eventBus = eventBus;
63  
64  	}
65  
66  	@Override
67  	public void initializeTask() throws SourceConfigurationException {
68  		if (client == null)
69  			throw new SourceConfigurationException("MiloClient has not (yet) been set");
70  
71  		// Check whether a single node id or a path has been selected
72  		if (!Strings.isNullOrEmpty(config.getNodeId())) {
73  			var id = MiloClient.getNodeId(config.getNodeId());
74  			try {
75  
76  				var res = client.readNodeValue(id);
77  
78  				if (res.getStatusCode() == null || !res.getStatusCode().isGood())
79  					log.warn("Initial read of MiloSource node {} did not succeed: {}", config.getNodeId(),
80  							res.getStatusCode());
81  
82  			} catch (SourceException se) {
83  				log.warn("Initial read of MiloSource node {} did not succeed, exception: {}", config.getNodeId(), se);
84  			}
85  			nodes.put(id, new NodeItem());
86  		} else if (!Strings.isNullOrEmpty(config.getNodePath())) {
87  			nodes.putAll(client.resolveNodePath(config.getNodePath()));
88  		} else
89  			throw new SourceConfigurationException("Neither NodeId nor NodePath have been configured");
90  
91  		initialized = true;
92  	}
93  
94  	@Override
95  	public void startTask() throws SourceException {
96  		if (!initialized)
97  			throw new SourceException("Source not initialized!");
98  
99  		boolean startPollingThread = false;
100 
101 		if (config.getCaptureMode() == CaptureMode.SUBSCRIPTION) {
102 			// register subscriptions
103 			var subParams = new SubscriptionParameters(config.getSamplingInterval(), config.getPublishingInterval(),
104 					config.getQueueSize());
105 			client.registerNodeSubscription(this, subParams, nodes.keySet());
106 
107 			if (config.getPollingInterval() != null && config.getPollingInterval() > 0)
108 				startPollingThread = true;
109 
110 		} else {
111 			startPollingThread = true;
112 		}
113 
114 		if (startPollingThread) {
115 			// Start polling thread
116 			executorService = Executors.newSingleThreadScheduledExecutor();
117 			pollingTask = executorService.scheduleAtFixedRate(this::poll, 0, DELAYED_SUBSCRIPTION_POLL_INTERVAL,
118 					TimeUnit.MILLISECONDS);
119 		}
120 
121 	}
122 
123 	@Override
124 	public synchronized void stopTask() {
125 		if (pollingTask != null) {
126 			pollingTask.cancel(false);
127 			pollingTask = null;
128 		}
129 
130 		if (executorService != null) {
131 			executorService.shutdown();
132 		}
133 		initialized = false;
134 	}
135 
136 	@Override
137 	public synchronized void joinTask() throws InterruptedException {
138 		if (executorService != null) {
139 			executorService.awaitTermination(5, TimeUnit.SECONDS);
140 			executorService = null;
141 		}
142 	}
143 
144 	public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
145 		var nodeItem = nodes.get(item.getReadValueId().getNodeId());
146 		if (nodeItem != null) {
147 			nodeItem.setLastValue(value);
148 			if (value.getStatusCode() != null && value.getStatusCode().isGood()) {
149 
150 				eventBus.post(new DataAvailableEvent(config.getId(), value.getValue().getValue(), getSourceTime(value),
151 						nodeItem.getVarMap()));
152 				nodeItem.setLastUpdate(Instant.now());
153 			}
154 		}
155 
156 	}
157 
158 	private synchronized void poll() {
159 		for (var node : nodes.entrySet()) {
160 			if (config.getCaptureMode() == CaptureMode.POLLING || (ChronoUnit.MILLIS
161 					.between(node.getValue().getLastUpdate(), Instant.now()) >= config.getPollingInterval())) {
162 				pollNode(node);
163 				node.getValue().setLastUpdate(Instant.now());
164 			}
165 		}
166 
167 	}
168 
169 	private void pollNode(Entry<NodeId, NodeItem> node) {
170 		try {
171 			var dv = client.readNodeValue(node.getKey());
172 			node.getValue().setLastValue(dv);
173 			if (dv.getStatusCode() != null && dv.getStatusCode().isGood()) {
174 				eventBus.post(new DataAvailableEvent(config.getId(), dv.getValue().getValue(), getSourceTime(dv),
175 						node.getValue().getVarMap()));
176 			}
177 
178 		} catch (SourceException se) {
179 			log.warn("Could not poll node value.");
180 		}
181 	}
182 
183 	private static long getSourceTime(DataValue dv) {
184 		var sourceTime = dv.getSourceTime();
185 		if (sourceTime != null)
186 			return sourceTime.getJavaTime() * MSEC_TO_NSEC;
187 
188 		var serverTime = dv.getServerTime();
189 		if (serverTime != null)
190 			return serverTime.getJavaTime() * MSEC_TO_NSEC;
191 		return new Date().getTime() * MSEC_TO_NSEC;
192 	}
193 
194 	public SourceOPCUACfg getConfig() {
195 		return config;
196 	}
197 
198 	public NodeItems getNodes() {
199 		return nodes;
200 	}
201 }