ShepardSink.java

package de.dlr.bt.stc.sink.shepard;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.configuration2.ex.ConfigurationException;
import org.greenrobot.eventbus.EventBus;

import de.dlr.bt.stc.eventbus.CacheFullEvent;
import de.dlr.bt.stc.sink.ASink;
import de.dlr.shepard.client.java.ApiException;
import de.dlr.shepard.client.java.api.TimeseriesApi;
import de.dlr.shepard.client.java.model.InfluxPoint;
import de.dlr.shepard.client.java.model.Timeseries;
import de.dlr.shepard.client.java.model.TimeseriesPayload;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ShepardSink extends ASink {

	private ShepardApiFactory shepardApiFactory = new ShepardApiFactory();
	private TimeseriesApi timeseriesApi = null;
	@Getter
	private final String host;
	private final String apiKey;
	@Getter
	private final Long timeseriesContainerId;

	private final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();

	public ShepardSink(String key, SinkShepardCfg config, EventBus eventBus) {
		super(key, eventBus, config.getField());

		this.host = config.getHost();
		this.apiKey = config.getApiKey();
		this.timeseriesContainerId = config.getTimeseriesContainerId();
	}

	@Override
	public void initializeTask() throws ConfigurationException {
		super.initializeTask();
		timeseriesApi = shepardApiFactory.getTimeseriesApi(host, apiKey);
	}

	@Override
	public void stopTask() {
		super.stopTask();
		executor.shutdown();
	}

	@Override
	public void joinTask() throws InterruptedException {
		try {
			executor.awaitTermination(5, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			log.error("ASink thread was interrupted!");
			Thread.currentThread().interrupt();
		}
	}

	public int getExecutorActiveCount() {
		return executor.getActiveCount();
	}

	public int getExecutorPoolSize() {
		return executor.getPoolSize();
	}

	public int getExecutorLargestPoolSize() {
		return executor.getLargestPoolSize();
	}

	@Override
	protected void handleEvent(CacheFullEvent event) {
		executor.execute(() -> sendToShepard(event));
	}

	private void sendToShepard(CacheFullEvent event) {
		log.info("Received CacheFullEvent in sink {}: {}", key, event.toString());
		if (timeseriesApi == null) {
			log.error("Timeseries Api not yet initialized. Dropping cache full event...");
			return;
		}

		var payload = convertCacheFullEvent(event);
		try {
			timeseriesApi.createTimeseries(timeseriesContainerId, payload);
		} catch (ApiException e) {
			log.error("ApiException in shepard sink {}: {}", key, e.toString());
		} catch (Exception e) {
			log.error("Non-Api Exception in shepard sink {}: {}", key, e.toString());
		}
	}

	private TimeseriesPayload convertCacheFullEvent(CacheFullEvent event) {
		var result = new TimeseriesPayload();

		List<InfluxPoint> points = new ArrayList<>(event.getData().size());
		for (var dp : event.getData()) {
			var value = sanitizeValue(dp.getValue());
			if (value == null) {
				log.warn("value is null, ignoring...");
				continue;
			}
			var ip = new InfluxPoint();
			ip.setTimestamp(dp.getTimestamp());
			ip.setValue(value);
			points.add(ip);
		}
		result.setPoints(points);

		var fieldToSet = decideForField(event.getTimeseries().getField());
		if (fieldToSet == null) {
			log.error("No field configured for timeseries {} in sink {}", event.getTimeseries(), key);
		}

		var timeseries = new Timeseries();
		timeseries.setMeasurement(event.getTimeseries().getMeasurement());
		timeseries.setLocation(event.getTimeseries().getLocation());
		timeseries.setDevice(event.getTimeseries().getDevice());
		timeseries.setSymbolicName(event.getTimeseries().getSymbolicName());
		timeseries.setField(fieldToSet);
		result.setTimeseries(timeseries);

		return result;
	}

	private Object sanitizeValue(Object value) {
		if (value instanceof Number || value instanceof Boolean || value instanceof String)
			return value;

		if (value == null)
			return null;

		return value.toString();
	}

	@Override
	protected boolean validate() {
		try {
			new URI(host).toURL();
		} catch (URISyntaxException | MalformedURLException | IllegalArgumentException ex) {
			log.error("{} is not a valid URL", host);
			return false;
		}

		if (apiKey.isBlank()) {
			log.error("Api Key cannot be blank");
			return false;
		}

		if (timeseriesContainerId < 0) {
			log.error("Timeseries Container ID cannot be less than zero");
			return false;
		}

		return true;
	}

}