ListenerNotifierThread.java

  1. package gov.usgs.earthquake.distribution.roundrobinnotifier;

  2. import java.util.Date;
  3. import java.util.concurrent.LinkedBlockingQueue;
  4. import java.util.logging.Level;
  5. import java.util.logging.Logger;

  6. import gov.usgs.earthquake.distribution.DefaultNotificationListener;
  7. import gov.usgs.earthquake.distribution.NotificationEvent;
  8. import gov.usgs.earthquake.distribution.NotificationListener;
  9. import gov.usgs.earthquake.product.AbstractListener;

  10. /**
  11.  * Thread that delivers notifications to a listener.
  12.  *
  13.  * Uses interrupt to stop thread, so listeners should be careful when also using
  14.  * interrupts.
  15.  */
  16. public class ListenerNotifierThread implements Runnable {

  17.     /** Logging object. */
  18.     private static final Logger LOGGER = Logger
  19.             .getLogger(ListenerNotifierThread.class.getName());

  20.     /** Listener that receives notifications. */
  21.     private final NotificationListener listener;
  22.     /** Queue of notifications to deliver. */
  23.     private final ListenerNotificationQueue queue;
  24.     /** Queue of notifications that failed, and should be reattempted. */
  25.     private final LinkedBlockingQueue<ListenerNotification> errorQueue;
  26.     /** Thread where "this" is running. */
  27.     private Thread thread;

  28.     /**
  29.      * Create a new listener notifier thread.
  30.      *
  31.      * @param listener
  32.      *            listener that receives notifications.
  33.      */
  34.     public ListenerNotifierThread(final NotificationListener listener) {
  35.         this.listener = listener;
  36.         this.queue = new ListenerNotificationQueue();
  37.         this.errorQueue = new LinkedBlockingQueue<ListenerNotification>();
  38.         this.thread = null;
  39.     }

  40.     /**
  41.      * Start processing notifications in the queue.
  42.      */
  43.     public void start() {
  44.         if (thread == null) {
  45.             thread = new Thread(this);
  46.             thread.start();
  47.         }
  48.     }

  49.     /**
  50.      * Stop processing notifications in the queue.
  51.      */
  52.     public void stop() {
  53.         if (thread != null) {
  54.             thread.interrupt();
  55.             thread = null;
  56.         }
  57.     }

  58.     /**
  59.      * Process notifications in the queue.
  60.      */
  61.     public void run() {
  62.         ListenerNotification notification = null;
  63.         Date start = null;
  64.         while (!Thread.currentThread().isInterrupted()) {
  65.             try {
  66.                 notification = queue.take();
  67.                 notification.attempts++;
  68.                 start = new Date();
  69.                 listener.onNotification(notification.event);
  70.                 LOGGER.fine("[" + listener.getName() + "] processed "
  71.                         + notification.getProductId() + " in "
  72.                         + (new Date().getTime() - start.getTime()) + "ms");
  73.             } catch (InterruptedException ie) {
  74.                 // thread is stopping
  75.             } catch (Exception e) {
  76.                 // requeue notification
  77.                 if (notification.attempts < listener.getMaxTries()) {
  78.                     // requeue
  79.                     notification.lastAttempt = new Date();
  80.                     errorQueue.add(notification);
  81.                     LOGGER.log(Level.FINE, "[" + listener.getName()
  82.                             + "] exception processing "
  83.                             + notification.getProductId() + " attempt "
  84.                             + notification.attempts + "/"
  85.                             + listener.getMaxTries() + ", requeuing");
  86.                 } else {
  87.                     // couldn't process
  88.                     LOGGER.log(Level.WARNING, "[" + listener.getName()
  89.                             + "] unable to process "
  90.                             + notification.getProductId() + " "
  91.                             + notification.attempts + " attempts", e);
  92.                 }
  93.             }
  94.         }
  95.     }

  96.     /**
  97.      * Add a notification to the queue.
  98.      *
  99.      * Checks if notification is "accept"able before queueing.
  100.      *
  101.      * @param event
  102.      *            notification to add.
  103.      */
  104.     public void notify(final NotificationEvent event) {
  105.         if (listener instanceof AbstractListener) {
  106.             AbstractListener abstractListener = (AbstractListener) listener;
  107.             if (!abstractListener.accept(event.getNotification().getProductId())) {
  108.                 return;
  109.             }
  110.         }
  111.         queue.add(new ListenerNotification(event));
  112.     }

  113.     /**
  114.      * @return the listener.
  115.      */
  116.     public NotificationListener getListener() {
  117.         return listener;
  118.     }

  119.     /**
  120.      * @return the queue.
  121.      */
  122.     public ListenerNotificationQueue getQueue() {
  123.         return queue;
  124.     }

  125.     /**
  126.      * @return the error queue.
  127.      */
  128.     public LinkedBlockingQueue<ListenerNotification> getErrorQueue() {
  129.         return errorQueue;
  130.     }

  131.     /**
  132.      * Move any failed notifications that are ready to be retried from the error
  133.      * queue into the queue.
  134.      */
  135.     public void requeueErrors() {
  136.         ListenerNotification notification;
  137.         Date threshold = new Date();
  138.         if (listener instanceof DefaultNotificationListener) {
  139.             threshold = new Date(threshold.getTime()
  140.                     - ((DefaultNotificationListener) listener).getRetryDelay());
  141.         }
  142.         while (true) {
  143.             notification = errorQueue.peek();
  144.             if (
  145.             // no more notifications
  146.             notification == null ||
  147.             // after threshold
  148.                     notification.lastAttempt.before(threshold)) {
  149.                 break;
  150.             }
  151.             // requeue notification
  152.             queue.add(errorQueue.poll());
  153.         }
  154.     }

  155. }