ShepardSink.java
package de.dlr.bt.stc.sink.shepard;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.greenrobot.eventbus.EventBus;
import de.dlr.bt.stc.eventbus.CacheFullEvent;
import de.dlr.bt.stc.sink.ASink;
import de.dlr.shepard.client.java.ApiException;
import de.dlr.shepard.client.java.api.TimeseriesApi;
import de.dlr.shepard.client.java.model.InfluxPoint;
import de.dlr.shepard.client.java.model.Timeseries;
import de.dlr.shepard.client.java.model.TimeseriesPayload;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ShepardSink extends ASink {
private ShepardApiFactory shepardApiFactory = new ShepardApiFactory();
private TimeseriesApi timeseriesApi = null;
@Getter
private final String host;
private final String apiKey;
@Getter
private final Long timeseriesContainerId;
private final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
public ShepardSink(String key, SinkShepardCfg config, EventBus eventBus) {
super(key, eventBus, config.getField());
this.host = config.getHost();
this.apiKey = config.getApiKey();
this.timeseriesContainerId = config.getTimeseriesContainerId();
}
@Override
public void initializeTask() throws ConfigurationException {
super.initializeTask();
timeseriesApi = shepardApiFactory.getTimeseriesApi(host, apiKey);
}
@Override
public void stopTask() {
super.stopTask();
executor.shutdown();
}
@Override
public void joinTask() throws InterruptedException {
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("ASink thread was interrupted!");
Thread.currentThread().interrupt();
}
}
public int getExecutorActiveCount() {
return executor.getActiveCount();
}
public int getExecutorPoolSize() {
return executor.getPoolSize();
}
public int getExecutorLargestPoolSize() {
return executor.getLargestPoolSize();
}
@Override
protected void handleEvent(CacheFullEvent event) {
executor.execute(() -> sendToShepard(event));
}
private void sendToShepard(CacheFullEvent event) {
log.info("Received CacheFullEvent in sink {}: {}", key, event.toString());
if (timeseriesApi == null) {
log.error("Timeseries Api not yet initialized. Dropping cache full event...");
return;
}
var payload = convertCacheFullEvent(event);
try {
timeseriesApi.createTimeseries(timeseriesContainerId, payload);
} catch (ApiException e) {
log.error("ApiException in shepard sink {}: {}", key, e.toString());
} catch (Exception e) {
log.error("Non-Api Exception in shepard sink {}: {}", key, e.toString());
}
}
private TimeseriesPayload convertCacheFullEvent(CacheFullEvent event) {
var result = new TimeseriesPayload();
List<InfluxPoint> points = new ArrayList<>(event.getData().size());
for (var dp : event.getData()) {
var value = sanitizeValue(dp.getValue());
if (value == null) {
log.warn("value is null, ignoring...");
continue;
}
var ip = new InfluxPoint();
ip.setTimestamp(dp.getTimestamp());
ip.setValue(value);
points.add(ip);
}
result.setPoints(points);
var fieldToSet = decideForField(event.getTimeseries().getField());
if (fieldToSet == null) {
log.error("No field configured for timeseries {} in sink {}", event.getTimeseries(), key);
}
var timeseries = new Timeseries();
timeseries.setMeasurement(event.getTimeseries().getMeasurement());
timeseries.setLocation(event.getTimeseries().getLocation());
timeseries.setDevice(event.getTimeseries().getDevice());
timeseries.setSymbolicName(event.getTimeseries().getSymbolicName());
timeseries.setField(fieldToSet);
result.setTimeseries(timeseries);
return result;
}
private Object sanitizeValue(Object value) {
if (value instanceof Number || value instanceof Boolean || value instanceof String)
return value;
if (value == null)
return null;
return value.toString();
}
@Override
protected boolean validate() {
try {
new URI(host).toURL();
} catch (URISyntaxException | MalformedURLException | IllegalArgumentException ex) {
log.error("{} is not a valid URL", host);
return false;
}
if (apiKey.isBlank()) {
log.error("Api Key cannot be blank");
return false;
}
if (timeseriesContainerId < 0) {
log.error("Timeseries Container ID cannot be less than zero");
return false;
}
return true;
}
}