ListenerNotifierThread.java

package gov.usgs.earthquake.distribution.roundrobinnotifier;

import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.NotificationEvent;
import gov.usgs.earthquake.distribution.NotificationListener;
import gov.usgs.earthquake.product.AbstractListener;

/**
 * Thread that delivers notifications to a listener.
 * 
 * Uses interrupt to stop thread, so listeners should be careful when also using
 * interrupts.
 */
public class ListenerNotifierThread implements Runnable {

	/** Logging object. */
	private static final Logger LOGGER = Logger
			.getLogger(ListenerNotifierThread.class.getName());

	/** Listener that receives notifications. */
	private final NotificationListener listener;
	/** Queue of notifications to deliver. */
	private final ListenerNotificationQueue queue;
	/** Queue of notifications that failed, and should be reattempted. */
	private final LinkedBlockingQueue<ListenerNotification> errorQueue;
	/** Thread where "this" is running. */
	private Thread thread;

	/**
	 * Create a new listener notifier thread.
	 * 
	 * @param listener
	 *            listener that receives notifications.
	 */
	public ListenerNotifierThread(final NotificationListener listener) {
		this.listener = listener;
		this.queue = new ListenerNotificationQueue();
		this.errorQueue = new LinkedBlockingQueue<ListenerNotification>();
		this.thread = null;
	}

	/**
	 * Start processing notifications in the queue.
	 */
	public void start() {
		if (thread == null) {
			thread = new Thread(this);
			thread.start();
		}
	}

	/**
	 * Stop processing notifications in the queue.
	 */
	public void stop() {
		if (thread != null) {
			thread.interrupt();
			thread = null;
		}
	}

	/**
	 * Process notifications in the queue.
	 */
	public void run() {
		ListenerNotification notification = null;
		Date start = null;
		while (!Thread.currentThread().isInterrupted()) {
			try {
				notification = queue.take();
				notification.attempts++;
				start = new Date();
				listener.onNotification(notification.event);
				LOGGER.fine("[" + listener.getName() + "] processed "
						+ notification.getProductId() + " in "
						+ (new Date().getTime() - start.getTime()) + "ms");
			} catch (InterruptedException ie) {
				// thread is stopping
			} catch (Exception e) {
				// requeue notification
				if (notification.attempts < listener.getMaxTries()) {
					// requeue
					notification.lastAttempt = new Date();
					errorQueue.add(notification);
					LOGGER.log(Level.FINE, "[" + listener.getName()
							+ "] exception processing "
							+ notification.getProductId() + " attempt "
							+ notification.attempts + "/"
							+ listener.getMaxTries() + ", requeuing");
				} else {
					// couldn't process
					LOGGER.log(Level.WARNING, "[" + listener.getName()
							+ "] unable to process "
							+ notification.getProductId() + " "
							+ notification.attempts + " attempts", e);
				}
			}
		}
	}

	/**
	 * Add a notification to the queue.
	 * 
	 * Checks if notification is "accept"able before queueing.
	 * 
	 * @param event
	 *            notification to add.
	 */
	public void notify(final NotificationEvent event) {
		if (listener instanceof AbstractListener) {
			AbstractListener abstractListener = (AbstractListener) listener;
			if (!abstractListener.accept(event.getNotification().getProductId())) {
				return;
			}
		}
		queue.add(new ListenerNotification(event));
	}

	/**
	 * @return the listener.
	 */
	public NotificationListener getListener() {
		return listener;
	}

	/**
	 * @return the queue.
	 */
	public ListenerNotificationQueue getQueue() {
		return queue;
	}

	/**
	 * @return the error queue.
	 */
	public LinkedBlockingQueue<ListenerNotification> getErrorQueue() {
		return errorQueue;
	}

	/**
	 * Move any failed notifications that are ready to be retried from the error
	 * queue into the queue.
	 */
	public void requeueErrors() {
		ListenerNotification notification;
		Date threshold = new Date();
		if (listener instanceof DefaultNotificationListener) {
			threshold = new Date(threshold.getTime()
					- ((DefaultNotificationListener) listener).getRetryDelay());
		}
		while (true) {
			notification = errorQueue.peek();
			if (
			// no more notifications
			notification == null ||
			// after threshold
					notification.lastAttempt.before(threshold)) {
				break;
			}
			// requeue notification
			queue.add(errorQueue.poll());
		}
	}

}