ASink.java

package de.dlr.bt.stc.sink;

import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.commons.configuration2.ex.ConfigurationException;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import org.greenrobot.eventbus.ThreadMode;

import de.dlr.bt.stc.eventbus.CacheFullEvent;
import de.dlr.bt.stc.task.ITask;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@ToString(onlyExplicitlyIncluded = true)
public abstract class ASink implements ITask {
	@ToString.Include
	@Getter
	protected final String key;
	private final EventBus eventBus;
	private final String field;
	private final ConcurrentLinkedQueue<CacheFullEvent> eventQueue = new ConcurrentLinkedQueue<>();

	@Override
	public void initializeTask() throws ConfigurationException {
		log.debug("Initializing sink {}", key);
		if (!validate()) {
			throw new ConfigurationException(String.format("Sink %s is misconfigured", key));
		}
	}

	@Override
	public void startTask() {
		log.debug("Starting sink {}", key);
		eventBus.register(this);
	}

	@Override
	public void stopTask() {
		log.debug("Stopping sink {}", key);
		eventBus.unregister(this);
		// Clear the event queue
		handleEvents();
	}

	@Subscribe(threadMode = ThreadMode.MAIN)
	public void onCacheFullEvent(CacheFullEvent event) {
		if (key.equals(event.getSinkKey())) {
			log.debug("Added event {} to sink {}", event, key);
			eventQueue.add(event);
		}
	}

	/**
	 * This method must be called repeatedly by the handling thread
	 */
	public void handleEvents() {
		log.debug("Clear event queue for sink {}", key);
		while (!eventQueue.isEmpty()) {
			var event = eventQueue.remove();
			var before = System.currentTimeMillis();
			handleEvent(event);
			var after = System.currentTimeMillis();
			log.debug("Handling of event {} took {} ms", event.toString(), after - before);
		}
	}

	public int getQueueSize() {
		return eventQueue.size();
	}

	/**
	 * Returns either the fieldFromBridge or the configured field. fieldFromBridge
	 * is preferred. Returns null if neither is set.
	 */
	protected String decideForField(String fieldFromBridge) {
		if (fieldFromBridge != null && !fieldFromBridge.isBlank()) {
			return fieldFromBridge;
		} else if (field != null && !field.isBlank()) {
			return field;
		} else {
			return null;
		}
	}

	/**
	 * Handle cache full events
	 */
	protected abstract void handleEvent(CacheFullEvent event);

	/**
	 * Validates the configuration
	 * 
	 * @return whether the configuration is valid or not
	 */
	protected abstract boolean validate();

}