RoundRobinListenerNotifier.java

  1. package gov.usgs.earthquake.distribution.roundrobinnotifier;

  2. import java.util.ArrayList;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.logging.Level;
  8. import java.util.logging.Logger;

  9. import gov.usgs.earthquake.distribution.DefaultNotificationListener;
  10. import gov.usgs.earthquake.distribution.ListenerNotifier;
  11. import gov.usgs.earthquake.distribution.Notification;
  12. import gov.usgs.earthquake.distribution.NotificationEvent;
  13. import gov.usgs.earthquake.distribution.NotificationIndex;
  14. import gov.usgs.earthquake.distribution.NotificationListener;
  15. import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
  16. import gov.usgs.util.DefaultConfigurable;

  17. /**
  18.  * Use round-robin queues to notify listeners.
  19.  *
  20.  * This attempts to prevent any one product source+type from blocking processing
  21.  * of notifications from other product source+type.
  22.  */
  23. public class RoundRobinListenerNotifier extends DefaultConfigurable implements
  24.         ListenerNotifier, Runnable {

  25.     /** Logging object. */
  26.     private static final Logger LOGGER = Logger
  27.             .getLogger(RoundRobinListenerNotifier.class.getName());

  28.     /** List of indexes that have already been requeued. */
  29.     private static final ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();

  30.     /** The receiver using this notifier. */
  31.     private final DefaultNotificationReceiver receiver;
  32.     /** Registered notification listeners. */
  33.     private final HashMap<NotificationListener, ListenerNotifierThread> listeners;
  34.     /** Status/requeue thread. */
  35.     private Thread thread;
  36.     /** How often to print status and check for notifications to requeue. */
  37.     private long statusInterval = 5000L;

  38.     /**
  39.      * Create new RoundRobinListenerNotifier.
  40.      *
  41.      * @param receiver
  42.      *            the receiver using this notifier.
  43.      */
  44.     public RoundRobinListenerNotifier(final DefaultNotificationReceiver receiver) {
  45.         this.receiver = receiver;
  46.         this.listeners = new HashMap<NotificationListener, ListenerNotifierThread>();
  47.         this.thread = null;
  48.     }

  49.     /**
  50.      * Start the status/requeue thread.
  51.      */
  52.     public void startup() throws Exception {
  53.         if (thread == null) {
  54.             thread = new Thread(this);
  55.             thread.start();

  56.             requeue();
  57.         }
  58.     }

  59.     /**
  60.      * Stop the status/requeue thread.
  61.      */
  62.     public void shutdown() {
  63.         if (thread != null) {
  64.             thread.interrupt();
  65.             thread = null;
  66.         }
  67.     }

  68.     /**
  69.      * Add a notification listener.
  70.      */
  71.     @Override
  72.     public void addNotificationListener(NotificationListener listener)
  73.             throws Exception {
  74.         if (!listeners.containsKey(listener)) {
  75.             ListenerNotifierThread notifier = new ListenerNotifierThread(
  76.                     listener);
  77.             listeners.put(listener, notifier);
  78.             notifier.start();
  79.         }
  80.     }

  81.     /**
  82.      * Remove a notification listener.
  83.      */
  84.     @Override
  85.     public void removeNotificationListener(NotificationListener listener)
  86.             throws Exception {
  87.         if (listeners.containsKey(listener)) {
  88.             ListenerNotifierThread notifier = listeners.remove(listener);
  89.             notifier.stop();
  90.         }
  91.     }

  92.     /**
  93.      * Notify listeners.
  94.      */
  95.     @Override
  96.     public void notifyListeners(NotificationEvent event) throws Exception {
  97.         notifyListeners(event, listeners.values());
  98.     }

  99.     /**
  100.      * Notify a specific list of listeners.
  101.      *
  102.      * Used during renotification to only notify listeners that have an index.
  103.      *
  104.      * @param event
  105.      *            notification.
  106.      * @param toNotify
  107.      *            list of listeners to notify.
  108.      * @throws Exception
  109.      *            if error occurs
  110.      */
  111.     protected void notifyListeners(NotificationEvent event,
  112.             final Collection<ListenerNotifierThread> toNotify) throws Exception {
  113.         Iterator<ListenerNotifierThread> iter = toNotify.iterator();
  114.         while (iter.hasNext()) {
  115.             iter.next().notify(event);
  116.         }
  117.     }

  118.     /**
  119.      * Run status/requeue tasks.
  120.      */
  121.     public void run() {
  122.         while (!Thread.currentThread().isInterrupted()) {
  123.             try {
  124.                 // run every 5 seconds
  125.                 Thread.sleep(statusInterval);

  126.                 Iterator<ListenerNotifierThread> iter = listeners.values()
  127.                         .iterator();
  128.                 while (iter.hasNext()) {
  129.                     ListenerNotifierThread notifier = iter.next();
  130.                     // requeue errors
  131.                     notifier.requeueErrors();
  132.                     int queued = notifier.getQueue().size();
  133.                     int errors = notifier.getErrorQueue().size();
  134.                     // print status
  135.                     LOGGER.fine("[" + receiver.getName()
  136.                             + "-notifier] listener "
  137.                             + notifier.getListener().getName() + " " + queued
  138.                             + " queued, " + errors + " to retry");
  139.                 }
  140.             } catch (InterruptedException ie) {
  141.                 // stopping
  142.             } catch (Exception e) {
  143.                 LOGGER.log(Level.WARNING, "exception running notifier status",
  144.                         e);
  145.             }
  146.         }
  147.     }

  148.     /**
  149.      * Requeue existing notifications at startup.
  150.      *
  151.      * @throws Exception if error occurs
  152.      */
  153.     protected void requeue() throws Exception {
  154.         NotificationIndex index = receiver.getNotificationIndex();

  155.         ArrayList<ListenerNotifierThread> toRenotify = new ArrayList<ListenerNotifierThread>();
  156.         Iterator<ListenerNotifierThread> iter = listeners.values().iterator();
  157.         while (iter.hasNext()) {
  158.             ListenerNotifierThread notifier = iter.next();
  159.             NotificationListener listener = notifier.getListener();
  160.             if (listener instanceof DefaultNotificationListener
  161.                     && ((DefaultNotificationListener) listener)
  162.                             .getNotificationIndex() != null) {
  163.                 // listener that has notification index
  164.                 String key = index.getName() + "|" + listener.getName();
  165.                 if (!AUTOLOADED_INDEXES.contains(key)) {
  166.                     // not already renotified
  167.                     toRenotify.add(notifier);
  168.                 }
  169.             }
  170.         }
  171.         if (toRenotify.size() == 0) {
  172.             // no listeners to renotify
  173.             return;
  174.         }

  175.         LOGGER.fine("[" + receiver.getName()
  176.                 + "-notifier] requeuing notifications");
  177.         Iterator<Notification> notifications = index.findNotifications(
  178.                 (List<String>) null, (List<String>) null, (List<String>) null)
  179.                 .iterator();
  180.         while (notifications.hasNext()) {
  181.             Notification notification = notifications.next();
  182.             notifyListeners(new NotificationEvent(receiver, notification),
  183.                     toRenotify);
  184.         }
  185.         LOGGER.fine("[" + receiver.getName()
  186.                 + "-notifier] done requeuing notifications");
  187.     }

  188. }