ListenerNotifierThread.java
package gov.usgs.earthquake.distribution.roundrobinnotifier;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.NotificationEvent;
import gov.usgs.earthquake.distribution.NotificationListener;
import gov.usgs.earthquake.product.AbstractListener;
/**
* Thread that delivers notifications to a listener.
*
* Uses interrupt to stop thread, so listeners should be careful when also using
* interrupts.
*/
public class ListenerNotifierThread implements Runnable {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(ListenerNotifierThread.class.getName());
/** Listener that receives notifications. */
private final NotificationListener listener;
/** Queue of notifications to deliver. */
private final ListenerNotificationQueue queue;
/** Queue of notifications that failed, and should be reattempted. */
private final LinkedBlockingQueue<ListenerNotification> errorQueue;
/** Thread where "this" is running. */
private Thread thread;
/**
* Create a new listener notifier thread.
*
* @param listener
* listener that receives notifications.
*/
public ListenerNotifierThread(final NotificationListener listener) {
this.listener = listener;
this.queue = new ListenerNotificationQueue();
this.errorQueue = new LinkedBlockingQueue<ListenerNotification>();
this.thread = null;
}
/**
* Start processing notifications in the queue.
*/
public void start() {
if (thread == null) {
thread = new Thread(this);
thread.start();
}
}
/**
* Stop processing notifications in the queue.
*/
public void stop() {
if (thread != null) {
thread.interrupt();
thread = null;
}
}
/**
* Process notifications in the queue.
*/
public void run() {
ListenerNotification notification = null;
Date start = null;
while (!Thread.currentThread().isInterrupted()) {
try {
notification = queue.take();
notification.attempts++;
start = new Date();
listener.onNotification(notification.event);
LOGGER.fine("[" + listener.getName() + "] processed "
+ notification.getProductId() + " in "
+ (new Date().getTime() - start.getTime()) + "ms");
} catch (InterruptedException ie) {
// thread is stopping
} catch (Exception e) {
// requeue notification
if (notification.attempts < listener.getMaxTries()) {
// requeue
notification.lastAttempt = new Date();
errorQueue.add(notification);
LOGGER.log(Level.FINE, "[" + listener.getName()
+ "] exception processing "
+ notification.getProductId() + " attempt "
+ notification.attempts + "/"
+ listener.getMaxTries() + ", requeuing");
} else {
// couldn't process
LOGGER.log(Level.WARNING, "[" + listener.getName()
+ "] unable to process "
+ notification.getProductId() + " "
+ notification.attempts + " attempts", e);
}
}
}
}
/**
* Add a notification to the queue.
*
* Checks if notification is "accept"able before queueing.
*
* @param event
* notification to add.
*/
public void notify(final NotificationEvent event) {
if (listener instanceof AbstractListener) {
AbstractListener abstractListener = (AbstractListener) listener;
if (!abstractListener.accept(event.getNotification().getProductId())) {
return;
}
}
queue.add(new ListenerNotification(event));
}
/**
* @return the listener.
*/
public NotificationListener getListener() {
return listener;
}
/**
* @return the queue.
*/
public ListenerNotificationQueue getQueue() {
return queue;
}
/**
* @return the error queue.
*/
public LinkedBlockingQueue<ListenerNotification> getErrorQueue() {
return errorQueue;
}
/**
* Move any failed notifications that are ready to be retried from the error
* queue into the queue.
*/
public void requeueErrors() {
ListenerNotification notification;
Date threshold = new Date();
if (listener instanceof DefaultNotificationListener) {
threshold = new Date(threshold.getTime()
- ((DefaultNotificationListener) listener).getRetryDelay());
}
while (true) {
notification = errorQueue.peek();
if (
// no more notifications
notification == null ||
// after threshold
notification.lastAttempt.before(threshold)) {
break;
}
// requeue notification
queue.add(errorQueue.poll());
}
}
}