RSIClient.java

package de.dlr.bt.stc.source.rsi;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import de.dlr.bt.stc.exceptions.SourceConfigurationException;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RSIClient {
	private static final int BUFFER_SIZE = 4096;

	private final DatagramChannel channel;
	private final ScheduledExecutorService executor;

	private final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);

	private final List<Consumer<String>> messageListener = new ArrayList<>();

	public RSIClient(RSIClientCfg config) throws SourceConfigurationException {
		this.executor = Executors.newSingleThreadScheduledExecutor();
		try {

			channel = DatagramChannel.open();
			channel.bind(new InetSocketAddress(config.getPort()));
			channel.configureBlocking(false);
		} catch (IOException e) {
			throw new SourceConfigurationException(e);
		}
		this.executor.scheduleAtFixedRate(this::receiveData, 0, 1, TimeUnit.MILLISECONDS);
	}

	private void receiveData() {
		try {
			var result = channel.receive(buffer);
			if (result != null) {
				buffer.flip();
				byte[] bytes = new byte[buffer.remaining()];
				buffer.get(bytes);
				String message = new String(bytes);
				for (var listener : messageListener)
					listener.accept(message);
			}
		} catch (IOException ioe) {
			log.error("Error while receiving RSI data from UDP socket: {}", ioe);
		}
	}

	public void registerMessageListener(Consumer<String> listener) {
		messageListener.add(listener);
	}

	public void removeMessageListener(Consumer<String> listener) {
		messageListener.remove(listener);
	}

	public void cleanup() {
		messageListener.clear();

		this.executor.shutdown();

		try {
			if (channel.isOpen())
				channel.close();
		} catch (IOException ex) {
			log.error("Could not close channel {}: {}", channel, ex);
		}
	}
}