ASink.java
package de.dlr.bt.stc.sink;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import org.greenrobot.eventbus.ThreadMode;
import de.dlr.bt.stc.eventbus.CacheFullEvent;
import de.dlr.bt.stc.task.ITask;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
@ToString(onlyExplicitlyIncluded = true)
public abstract class ASink implements ITask {
@ToString.Include
@Getter
protected final String key;
private final EventBus eventBus;
private final String field;
private final ConcurrentLinkedQueue<CacheFullEvent> eventQueue = new ConcurrentLinkedQueue<>();
@Override
public void initializeTask() throws ConfigurationException {
log.debug("Initializing sink {}", key);
if (!validate()) {
throw new ConfigurationException(String.format("Sink %s is misconfigured", key));
}
}
@Override
public void startTask() {
log.debug("Starting sink {}", key);
eventBus.register(this);
}
@Override
public void stopTask() {
log.debug("Stopping sink {}", key);
eventBus.unregister(this);
// Clear the event queue
handleEvents();
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onCacheFullEvent(CacheFullEvent event) {
if (key.equals(event.getSinkKey())) {
log.debug("Added event {} to sink {}", event, key);
eventQueue.add(event);
}
}
/**
* This method must be called repeatedly by the handling thread
*/
public void handleEvents() {
log.debug("Clear event queue for sink {}", key);
while (!eventQueue.isEmpty()) {
var event = eventQueue.remove();
var before = System.currentTimeMillis();
handleEvent(event);
var after = System.currentTimeMillis();
log.debug("Handling of event {} took {} ms", event.toString(), after - before);
}
}
public int getQueueSize() {
return eventQueue.size();
}
/**
* Returns either the fieldFromBridge or the configured field. fieldFromBridge
* is preferred. Returns null if neither is set.
*/
protected String decideForField(String fieldFromBridge) {
if (fieldFromBridge != null && !fieldFromBridge.isBlank()) {
return fieldFromBridge;
} else if (field != null && !field.isBlank()) {
return field;
} else {
return null;
}
}
/**
* Handle cache full events
*/
protected abstract void handleEvent(CacheFullEvent event);
/**
* Validates the configuration
*
* @return whether the configuration is valid or not
*/
protected abstract boolean validate();
}