RoundRobinListenerNotifier.java

package gov.usgs.earthquake.distribution.roundrobinnotifier;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.ListenerNotifier;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationEvent;
import gov.usgs.earthquake.distribution.NotificationIndex;
import gov.usgs.earthquake.distribution.NotificationListener;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.util.DefaultConfigurable;

/**
 * Use round-robin queues to notify listeners.
 *
 * This attempts to prevent any one product source+type from blocking processing
 * of notifications from other product source+type.
 */
public class RoundRobinListenerNotifier extends DefaultConfigurable implements
		ListenerNotifier, Runnable {

	/** Logging object. */
	private static final Logger LOGGER = Logger
			.getLogger(RoundRobinListenerNotifier.class.getName());

	/** List of indexes that have already been requeued. */
	private static final ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();

	/** The receiver using this notifier. */
	private final DefaultNotificationReceiver receiver;
	/** Registered notification listeners. */
	private final HashMap<NotificationListener, ListenerNotifierThread> listeners;
	/** Status/requeue thread. */
	private Thread thread;
	/** How often to print status and check for notifications to requeue. */
	private long statusInterval = 5000L;

	/**
	 * Create new RoundRobinListenerNotifier.
	 *
	 * @param receiver
	 *            the receiver using this notifier.
	 */
	public RoundRobinListenerNotifier(final DefaultNotificationReceiver receiver) {
		this.receiver = receiver;
		this.listeners = new HashMap<NotificationListener, ListenerNotifierThread>();
		this.thread = null;
	}

	/**
	 * Start the status/requeue thread.
	 */
	public void startup() throws Exception {
		if (thread == null) {
			thread = new Thread(this);
			thread.start();

			requeue();
		}
	}

	/**
	 * Stop the status/requeue thread.
	 */
	public void shutdown() {
		if (thread != null) {
			thread.interrupt();
			thread = null;
		}
	}

	/**
	 * Add a notification listener.
	 */
	@Override
	public void addNotificationListener(NotificationListener listener)
			throws Exception {
		if (!listeners.containsKey(listener)) {
			ListenerNotifierThread notifier = new ListenerNotifierThread(
					listener);
			listeners.put(listener, notifier);
			notifier.start();
		}
	}

	/**
	 * Remove a notification listener.
	 */
	@Override
	public void removeNotificationListener(NotificationListener listener)
			throws Exception {
		if (listeners.containsKey(listener)) {
			ListenerNotifierThread notifier = listeners.remove(listener);
			notifier.stop();
		}
	}

	/**
	 * Notify listeners.
	 */
	@Override
	public void notifyListeners(NotificationEvent event) throws Exception {
		notifyListeners(event, listeners.values());
	}

	/**
	 * Notify a specific list of listeners.
	 *
	 * Used during renotification to only notify listeners that have an index.
	 *
	 * @param event
	 *            notification.
	 * @param toNotify
	 *            list of listeners to notify.
	 * @throws Exception
	 *            if error occurs
	 */
	protected void notifyListeners(NotificationEvent event,
			final Collection<ListenerNotifierThread> toNotify) throws Exception {
		Iterator<ListenerNotifierThread> iter = toNotify.iterator();
		while (iter.hasNext()) {
			iter.next().notify(event);
		}
	}

	/**
	 * Run status/requeue tasks.
	 */
	public void run() {
		while (!Thread.currentThread().isInterrupted()) {
			try {
				// run every 5 seconds
				Thread.sleep(statusInterval);

				Iterator<ListenerNotifierThread> iter = listeners.values()
						.iterator();
				while (iter.hasNext()) {
					ListenerNotifierThread notifier = iter.next();
					// requeue errors
					notifier.requeueErrors();
					int queued = notifier.getQueue().size();
					int errors = notifier.getErrorQueue().size();
					// print status
					LOGGER.fine("[" + receiver.getName()
							+ "-notifier] listener "
							+ notifier.getListener().getName() + " " + queued
							+ " queued, " + errors + " to retry");
				}
			} catch (InterruptedException ie) {
				// stopping
			} catch (Exception e) {
				LOGGER.log(Level.WARNING, "exception running notifier status",
						e);
			}
		}
	}

	/**
	 * Requeue existing notifications at startup.
	 *
	 * @throws Exception if error occurs
	 */
	protected void requeue() throws Exception {
		NotificationIndex index = receiver.getNotificationIndex();

		ArrayList<ListenerNotifierThread> toRenotify = new ArrayList<ListenerNotifierThread>();
		Iterator<ListenerNotifierThread> iter = listeners.values().iterator();
		while (iter.hasNext()) {
			ListenerNotifierThread notifier = iter.next();
			NotificationListener listener = notifier.getListener();
			if (listener instanceof DefaultNotificationListener
					&& ((DefaultNotificationListener) listener)
							.getNotificationIndex() != null) {
				// listener that has notification index
				String key = index.getName() + "|" + listener.getName();
				if (!AUTOLOADED_INDEXES.contains(key)) {
					// not already renotified
					toRenotify.add(notifier);
				}
			}
		}
		if (toRenotify.size() == 0) {
			// no listeners to renotify
			return;
		}

		LOGGER.fine("[" + receiver.getName()
				+ "-notifier] requeuing notifications");
		Iterator<Notification> notifications = index.findNotifications(
				(List<String>) null, (List<String>) null, (List<String>) null)
				.iterator();
		while (notifications.hasNext()) {
			Notification notification = notifications.next();
			notifyListeners(new NotificationEvent(receiver, notification),
					toRenotify);
		}
		LOGGER.fine("[" + receiver.getName()
				+ "-notifier] done requeuing notifications");
	}

}