ListenerNotifierThread.java
- package gov.usgs.earthquake.distribution.roundrobinnotifier;
- import java.util.Date;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import gov.usgs.earthquake.distribution.DefaultNotificationListener;
- import gov.usgs.earthquake.distribution.NotificationEvent;
- import gov.usgs.earthquake.distribution.NotificationListener;
- import gov.usgs.earthquake.product.AbstractListener;
- /**
- * Thread that delivers notifications to a listener.
- *
- * Uses interrupt to stop thread, so listeners should be careful when also using
- * interrupts.
- */
- public class ListenerNotifierThread implements Runnable {
- /** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(ListenerNotifierThread.class.getName());
- /** Listener that receives notifications. */
- private final NotificationListener listener;
- /** Queue of notifications to deliver. */
- private final ListenerNotificationQueue queue;
- /** Queue of notifications that failed, and should be reattempted. */
- private final LinkedBlockingQueue<ListenerNotification> errorQueue;
- /** Thread where "this" is running. */
- private Thread thread;
- /**
- * Create a new listener notifier thread.
- *
- * @param listener
- * listener that receives notifications.
- */
- public ListenerNotifierThread(final NotificationListener listener) {
- this.listener = listener;
- this.queue = new ListenerNotificationQueue();
- this.errorQueue = new LinkedBlockingQueue<ListenerNotification>();
- this.thread = null;
- }
- /**
- * Start processing notifications in the queue.
- */
- public void start() {
- if (thread == null) {
- thread = new Thread(this);
- thread.start();
- }
- }
- /**
- * Stop processing notifications in the queue.
- */
- public void stop() {
- if (thread != null) {
- thread.interrupt();
- thread = null;
- }
- }
- /**
- * Process notifications in the queue.
- */
- public void run() {
- ListenerNotification notification = null;
- Date start = null;
- while (!Thread.currentThread().isInterrupted()) {
- try {
- notification = queue.take();
- notification.attempts++;
- start = new Date();
- listener.onNotification(notification.event);
- LOGGER.fine("[" + listener.getName() + "] processed "
- + notification.getProductId() + " in "
- + (new Date().getTime() - start.getTime()) + "ms");
- } catch (InterruptedException ie) {
- // thread is stopping
- } catch (Exception e) {
- // requeue notification
- if (notification.attempts < listener.getMaxTries()) {
- // requeue
- notification.lastAttempt = new Date();
- errorQueue.add(notification);
- LOGGER.log(Level.FINE, "[" + listener.getName()
- + "] exception processing "
- + notification.getProductId() + " attempt "
- + notification.attempts + "/"
- + listener.getMaxTries() + ", requeuing");
- } else {
- // couldn't process
- LOGGER.log(Level.WARNING, "[" + listener.getName()
- + "] unable to process "
- + notification.getProductId() + " "
- + notification.attempts + " attempts", e);
- }
- }
- }
- }
- /**
- * Add a notification to the queue.
- *
- * Checks if notification is "accept"able before queueing.
- *
- * @param event
- * notification to add.
- */
- public void notify(final NotificationEvent event) {
- if (listener instanceof AbstractListener) {
- AbstractListener abstractListener = (AbstractListener) listener;
- if (!abstractListener.accept(event.getNotification().getProductId())) {
- return;
- }
- }
- queue.add(new ListenerNotification(event));
- }
- /**
- * @return the listener.
- */
- public NotificationListener getListener() {
- return listener;
- }
- /**
- * @return the queue.
- */
- public ListenerNotificationQueue getQueue() {
- return queue;
- }
- /**
- * @return the error queue.
- */
- public LinkedBlockingQueue<ListenerNotification> getErrorQueue() {
- return errorQueue;
- }
- /**
- * Move any failed notifications that are ready to be retried from the error
- * queue into the queue.
- */
- public void requeueErrors() {
- ListenerNotification notification;
- Date threshold = new Date();
- if (listener instanceof DefaultNotificationListener) {
- threshold = new Date(threshold.getTime()
- - ((DefaultNotificationListener) listener).getRetryDelay());
- }
- while (true) {
- notification = errorQueue.peek();
- if (
- // no more notifications
- notification == null ||
- // after threshold
- notification.lastAttempt.before(threshold)) {
- break;
- }
- // requeue notification
- queue.add(errorQueue.poll());
- }
- }
- }