RSIClient.java
package de.dlr.bt.stc.source.rsi;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RSIClient {
private static final int BUFFER_SIZE = 4096;
private final DatagramChannel channel;
private final ScheduledExecutorService executor;
private final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
private final List<Consumer<String>> messageListener = new ArrayList<>();
public RSIClient(RSIClientCfg config) throws SourceConfigurationException {
this.executor = Executors.newSingleThreadScheduledExecutor();
try {
channel = DatagramChannel.open();
channel.bind(new InetSocketAddress(config.getPort()));
channel.configureBlocking(false);
} catch (IOException e) {
throw new SourceConfigurationException(e);
}
this.executor.scheduleAtFixedRate(this::receiveData, 0, 1, TimeUnit.MILLISECONDS);
}
private void receiveData() {
try {
var result = channel.receive(buffer);
if (result != null) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String message = new String(bytes);
for (var listener : messageListener)
listener.accept(message);
}
} catch (IOException ioe) {
log.error("Error while receiving RSI data from UDP socket: {}", ioe);
}
}
public void registerMessageListener(Consumer<String> listener) {
messageListener.add(listener);
}
public void removeMessageListener(Consumer<String> listener) {
messageListener.remove(listener);
}
public void cleanup() {
messageListener.clear();
this.executor.shutdown();
try {
if (channel.isOpen())
channel.close();
} catch (IOException ex) {
log.error("Could not close channel {}: {}", channel, ex);
}
}
}