RoundRobinListenerNotifier.java
package gov.usgs.earthquake.distribution.roundrobinnotifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.ListenerNotifier;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationEvent;
import gov.usgs.earthquake.distribution.NotificationIndex;
import gov.usgs.earthquake.distribution.NotificationListener;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.util.DefaultConfigurable;
/**
* Use round-robin queues to notify listeners.
*
* This attempts to prevent any one product source+type from blocking processing
* of notifications from other product source+type.
*/
public class RoundRobinListenerNotifier extends DefaultConfigurable implements
ListenerNotifier, Runnable {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(RoundRobinListenerNotifier.class.getName());
/** List of indexes that have already been requeued. */
private static final ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();
/** The receiver using this notifier. */
private final DefaultNotificationReceiver receiver;
/** Registered notification listeners. */
private final HashMap<NotificationListener, ListenerNotifierThread> listeners;
/** Status/requeue thread. */
private Thread thread;
/** How often to print status and check for notifications to requeue. */
private long statusInterval = 5000L;
/**
* Create new RoundRobinListenerNotifier.
*
* @param receiver
* the receiver using this notifier.
*/
public RoundRobinListenerNotifier(final DefaultNotificationReceiver receiver) {
this.receiver = receiver;
this.listeners = new HashMap<NotificationListener, ListenerNotifierThread>();
this.thread = null;
}
/**
* Start the status/requeue thread.
*/
public void startup() throws Exception {
if (thread == null) {
thread = new Thread(this);
thread.start();
requeue();
}
}
/**
* Stop the status/requeue thread.
*/
public void shutdown() {
if (thread != null) {
thread.interrupt();
thread = null;
}
}
/**
* Add a notification listener.
*/
@Override
public void addNotificationListener(NotificationListener listener)
throws Exception {
if (!listeners.containsKey(listener)) {
ListenerNotifierThread notifier = new ListenerNotifierThread(
listener);
listeners.put(listener, notifier);
notifier.start();
}
}
/**
* Remove a notification listener.
*/
@Override
public void removeNotificationListener(NotificationListener listener)
throws Exception {
if (listeners.containsKey(listener)) {
ListenerNotifierThread notifier = listeners.remove(listener);
notifier.stop();
}
}
/**
* Notify listeners.
*/
@Override
public void notifyListeners(NotificationEvent event) throws Exception {
notifyListeners(event, listeners.values());
}
/**
* Notify a specific list of listeners.
*
* Used during renotification to only notify listeners that have an index.
*
* @param event
* notification.
* @param toNotify
* list of listeners to notify.
* @throws Exception
* if error occurs
*/
protected void notifyListeners(NotificationEvent event,
final Collection<ListenerNotifierThread> toNotify) throws Exception {
Iterator<ListenerNotifierThread> iter = toNotify.iterator();
while (iter.hasNext()) {
iter.next().notify(event);
}
}
/**
* Run status/requeue tasks.
*/
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// run every 5 seconds
Thread.sleep(statusInterval);
Iterator<ListenerNotifierThread> iter = listeners.values()
.iterator();
while (iter.hasNext()) {
ListenerNotifierThread notifier = iter.next();
// requeue errors
notifier.requeueErrors();
int queued = notifier.getQueue().size();
int errors = notifier.getErrorQueue().size();
// print status
LOGGER.fine("[" + receiver.getName()
+ "-notifier] listener "
+ notifier.getListener().getName() + " " + queued
+ " queued, " + errors + " to retry");
}
} catch (InterruptedException ie) {
// stopping
} catch (Exception e) {
LOGGER.log(Level.WARNING, "exception running notifier status",
e);
}
}
}
/**
* Requeue existing notifications at startup.
*
* @throws Exception if error occurs
*/
protected void requeue() throws Exception {
NotificationIndex index = receiver.getNotificationIndex();
ArrayList<ListenerNotifierThread> toRenotify = new ArrayList<ListenerNotifierThread>();
Iterator<ListenerNotifierThread> iter = listeners.values().iterator();
while (iter.hasNext()) {
ListenerNotifierThread notifier = iter.next();
NotificationListener listener = notifier.getListener();
if (listener instanceof DefaultNotificationListener
&& ((DefaultNotificationListener) listener)
.getNotificationIndex() != null) {
// listener that has notification index
String key = index.getName() + "|" + listener.getName();
if (!AUTOLOADED_INDEXES.contains(key)) {
// not already renotified
toRenotify.add(notifier);
}
}
}
if (toRenotify.size() == 0) {
// no listeners to renotify
return;
}
LOGGER.fine("[" + receiver.getName()
+ "-notifier] requeuing notifications");
Iterator<Notification> notifications = index.findNotifications(
(List<String>) null, (List<String>) null, (List<String>) null)
.iterator();
while (notifications.hasNext()) {
Notification notification = notifications.next();
notifyListeners(new NotificationEvent(receiver, notification),
toRenotify);
}
LOGGER.fine("[" + receiver.getName()
+ "-notifier] done requeuing notifications");
}
}