ExecutorListenerNotifier.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.earthquake.aws.JsonNotificationIndex;
  3. import gov.usgs.earthquake.product.AbstractListener;
  4. import gov.usgs.util.DefaultConfigurable;
  5. import gov.usgs.util.ExecutorTask;

  6. import java.util.ArrayList;
  7. import java.util.Collection;
  8. import java.util.Date;
  9. import java.util.HashMap;
  10. import java.util.Iterator;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.Timer;
  14. import java.util.concurrent.BlockingQueue;
  15. import java.util.concurrent.ExecutorService;
  16. import java.util.concurrent.Executors;
  17. import java.util.concurrent.ThreadPoolExecutor;
  18. import java.util.logging.Level;
  19. import java.util.logging.Logger;

  20. public class ExecutorListenerNotifier extends DefaultConfigurable implements
  21.         ListenerNotifier {

  22.     private static final Logger LOGGER = Logger
  23.             .getLogger(ExecutorListenerNotifier.class.getName());

  24.     private static ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();

  25.     private DefaultNotificationReceiver receiver;

  26.     /**
  27.      * Notification listeners registered to receive notifications, and an
  28.      * ExecutorService that delivers Notifications to each in a separate thread.
  29.      */
  30.     protected Map<NotificationListener, ExecutorService> notificationListeners = new HashMap<NotificationListener, ExecutorService>();

  31.     /**
  32.      * Make sure listener will accept notification before queueing it for
  33.      * processing.
  34.      */
  35.     protected boolean acceptBeforeQueuing = true;

  36.     /**
  37.      * Timer used to retry tasks when they fail and listeners have configured
  38.      * retryDelay.
  39.      */
  40.     protected Timer retryTimer = new Timer();

  41.     /** When queue size reaches this level, start throttling */
  42.     protected int throttleStartThreshold = 50000;

  43.     /** When queue size reaches this level, stop throttling */
  44.     protected int throttleStopThreshold = 25000;

  45.     /** When throttling, wait this many milliseconds between queue size checks. */
  46.     protected long throttleWaitInterval = 5000L;

  47.     /**
  48.      * Constructor
  49.      * @param receiver DefaultNotificationReceiver
  50.      */
  51.     public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) {
  52.         this.receiver = receiver;
  53.     }

  54.     /**
  55.      * Add a new notification listener.
  56.      *
  57.      * @param listener
  58.      *            the listener to add. When notifications are received, this
  59.      *            listener will be notified.
  60.      */
  61.     @Override
  62.     public void addNotificationListener(NotificationListener listener)
  63.             throws Exception {
  64.         if (!notificationListeners.containsKey(listener)) {
  65.             // fixed thread pool allows us to inspect the queue length...
  66.             int concurrentProducts = 1;
  67.             if (listener instanceof DefaultNotificationListener) {
  68.                 concurrentProducts =
  69.                         ((DefaultNotificationListener) listener).getConcurrentProducts();
  70.             }
  71.             ExecutorService listenerExecutor = Executors.newFixedThreadPool(
  72.                     concurrentProducts);
  73.             notificationListeners.put(listener, listenerExecutor);
  74.         }
  75.     }

  76.     /**
  77.      * Remove an existing notification listener.
  78.      *
  79.      * Any currently queued notifications are processed before shutting down.
  80.      *
  81.      * @param listener
  82.      *            the listener to remove. When notifications are receive, this
  83.      *            listener will no longer be notified.
  84.      */
  85.     @Override
  86.     public void removeNotificationListener(NotificationListener listener)
  87.             throws Exception {
  88.         // remove listener from map
  89.         ExecutorService listenerExecutor = notificationListeners
  90.                 .remove(listener);

  91.         // shutdown executor thread
  92.         listenerExecutor.shutdown();

  93.         // Could use shutdownNow() instead?
  94.         // however, shutdown() gives all listeners a chance to
  95.         // process all notifications, but may keep client from shutting down
  96.         // quickly. Also, see DefaultNotificationReceiver.shutdown().
  97.     }

  98.     /**
  99.      * Send a notification to all registered NotificationListeners.
  100.      *
  101.      * Creates a NotificationEvent, with a reference to this object and calls
  102.      * each notificationListeners onNotification method in separate threads.
  103.      *
  104.      * This method usually returns before registered NotificationListeners have
  105.      * completed processing a notification.
  106.      *
  107.      * @param event
  108.      *            the notification being sent to listeners.
  109.      * @throws Exception if error occurs
  110.      */
  111.     @Override
  112.     public void notifyListeners(final NotificationEvent event) throws Exception {
  113.         this.notifyListeners(event, this.notificationListeners.keySet());
  114.     }

  115.     /**
  116.      * Calls queueNotification with event and listener for each listener
  117.      * @param event NotificationEvent
  118.      * @param listeners Collection of NotificationListeners
  119.      * @throws Exception if error occurs
  120.      */
  121.     public void notifyListeners(final NotificationEvent event,
  122.             final Collection<NotificationListener> listeners) throws Exception {

  123.         Iterator<NotificationListener> iter = listeners.iterator();
  124.         while (iter.hasNext()) {
  125.             NotificationListener listener = iter.next();
  126.             // only requeue for default notification listeners
  127.             queueNotification(listener, event);
  128.         }
  129.     }

  130.     /**
  131.      * @param listener NotificationListener
  132.      * @param event NotificationEvent
  133.      */
  134.     protected void queueNotification(final NotificationListener listener,
  135.             final NotificationEvent event) {
  136.         if (acceptBeforeQueuing
  137.                 && listener instanceof DefaultNotificationListener) {
  138.             DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
  139.             if (!defaultListener.accept(event.getNotification().getProductId())) {
  140.                 return;
  141.             }
  142.         }

  143.         // determine retry delay
  144.         long retryDelay = 0L;
  145.         if (listener instanceof AbstractListener) {
  146.             retryDelay = ((AbstractListener) listener).getRetryDelay();
  147.         }

  148.         ExecutorService listenerExecutor = notificationListeners.get(listener);
  149.         ExecutorTask<Void> listenerTask = new ExecutorTask<Void>(
  150.                 listenerExecutor, listener.getMaxTries(),
  151.                 listener.getTimeout(), new NotificationListenerCallable(
  152.                         listener, event), retryTimer, retryDelay);
  153.         listenerExecutor.submit(listenerTask);

  154.         // log how many notifications are pending
  155.         if (listenerExecutor instanceof ThreadPoolExecutor) {
  156.             BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor)
  157.                     .getQueue();
  158.             LOGGER.fine("[" + event.getNotificationReceiver().getName()
  159.                     + "] listener (" + listener.getName() + ") has "
  160.                     + pending.size() + " queued notifications");
  161.         }
  162.     }


  163.     @Override
  164.     public void shutdown() throws Exception {
  165.         // remove all listeners
  166.         Iterator<NotificationListener> iter = new ArrayList<NotificationListener>(
  167.                 notificationListeners.keySet()).iterator();
  168.         while (iter.hasNext()) {
  169.             removeNotificationListener(iter.next());
  170.         }
  171.     }

  172.     @Override
  173.     public void startup() throws Exception {
  174.         super.startup();

  175.         NotificationIndex index = receiver.getNotificationIndex();

  176.         // filter down to listeners who can handle requeueing gracefully
  177.         ArrayList<NotificationListener> gracefulListeners = new ArrayList<NotificationListener>();
  178.         Iterator<NotificationListener> iter = this.notificationListeners
  179.                 .keySet().iterator();
  180.         while (iter.hasNext()) {
  181.             NotificationListener listener = iter.next();
  182.             // make sure each index only notifies each listener once
  183.             String key = listener.getName() + '|' + index.getName();
  184.             if (AUTOLOADED_INDEXES.contains(key)) {
  185.                 // already loaded this notification index for this listener
  186.                 // another receiver is sharing this notification index
  187.             } else if (listener instanceof DefaultNotificationListener
  188.                     && ((DefaultNotificationListener) listener)
  189.                             .getNotificationIndex() != null) {
  190.                 gracefulListeners.add(listener);
  191.                 AUTOLOADED_INDEXES.add(key);
  192.             }
  193.         }

  194.         if (gracefulListeners.size() == 0) {
  195.             // don't bother searching if nobody is listening
  196.             return;
  197.         }

  198.         LOGGER.info("[" + receiver.getName()
  199.                 + "] requeueing notification index '" + index.getName() + "'");
  200.         // find all existing notifications
  201.         List<Notification> allNotifications = null;

  202.         // for json index, push intersection into database if only one listener
  203.         if (index instanceof JsonNotificationIndex && gracefulListeners.size() == 1) {
  204.             NotificationIndex listenerIndex =
  205.                     ((DefaultNotificationListener) gracefulListeners.get(0))
  206.                     .getNotificationIndex();
  207.             if (listenerIndex instanceof JsonNotificationIndex
  208.                     && !((JsonNotificationIndex) listenerIndex).getDriver().contains("sqlite")
  209.             ) {
  210.                 // get intersection when potentially sharing database
  211.                 try {
  212.                     allNotifications =
  213.                             ((JsonNotificationIndex) index).getMissingNotifications(
  214.                                     ((JsonNotificationIndex) listenerIndex).getTable());
  215.                 } catch (Exception e) {
  216.                     LOGGER.log(Level.INFO, "Exception loading intersection, continuing", e);
  217.                 }
  218.             }
  219.         }

  220.         if (allNotifications == null) {
  221.             // fallback to previous behavior
  222.             allNotifications = index.findNotifications(
  223.                     (List<String>) null, (List<String>) null, (List<String>) null);
  224.         }
  225.         LOGGER.info("Done finding existing notifications");

  226.         // queue them for processing in case they were previous missed
  227.         Date now = new Date();
  228.         int count = 0;
  229.         for (final Notification notification : allNotifications) {
  230.             NotificationEvent event = new NotificationEvent(receiver, notification);
  231.             count += 1;
  232.             if (event.getNotification().getExpirationDate().after(now)) {
  233.                 // still valid
  234.                 this.notifyListeners(event, gracefulListeners);
  235.             }

  236.             // try to keep queue size managable during restart
  237.             throttleQueues(allNotifications.size() - count);
  238.         }
  239.         LOGGER.info("All notifications queued");

  240.         // keep track that we've processed this notification index
  241.         AUTOLOADED_INDEXES.add(index.getName());
  242.     }

  243.     /** @return default notification receiver */
  244.     public DefaultNotificationReceiver getReceiver() {
  245.         return receiver;
  246.     }

  247.     /** @param receiver of the default notification variety */
  248.     public void setReceiver(DefaultNotificationReceiver receiver) {
  249.         this.receiver = receiver;
  250.     }

  251.     /** @return map of status */
  252.     public Map<String, Integer> getStatus() {
  253.         HashMap<String, Integer> status = new HashMap<String, Integer>();

  254.         for (final NotificationListener listener : notificationListeners.keySet()) {
  255.             ExecutorService listenerExecutor = notificationListeners.get(listener);
  256.             if (listenerExecutor instanceof ThreadPoolExecutor) {
  257.                 // check how many notifications are pending
  258.                 int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
  259.                 status.put(receiver.getName() + " - " + listener.getName(), size);
  260.             }
  261.         }

  262.         return status;
  263.     }

  264.     /**
  265.      * Check queue status and return length of longest queue.
  266.      *
  267.      * @return length of longest queue, or null if no queue lengths.
  268.      */
  269.     public Integer getMaxQueueSize() {
  270.         Integer maxSize = null;
  271.         for (final NotificationListener listener : notificationListeners.keySet()) {
  272.             ExecutorService listenerExecutor = notificationListeners.get(listener);
  273.             if (listenerExecutor instanceof ThreadPoolExecutor) {
  274.                 // check how many notifications are pending
  275.                 int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
  276.                 if (maxSize == null || size > maxSize) {
  277.                     maxSize = size;
  278.                 }
  279.             }
  280.         }
  281.         return maxSize;
  282.     }

  283.     /**
  284.      * If longest queue has more than 50k notifications,
  285.      * wait until longest queue has 25k notifications before returning.
  286.      *
  287.      * @throws InterruptedException if error occurs
  288.      */
  289.     public void throttleQueues() throws InterruptedException {
  290.         throttleQueues(null);
  291.     }

  292.     /**
  293.      * If longest queue has more than 50k notifications,
  294.      * wait until longest queue has 25k notifications before returning.
  295.      *
  296.      * @param remaining integer
  297.      * @throws InterruptedException if error occurs
  298.      */
  299.     public void throttleQueues(Integer remaining) throws InterruptedException {
  300.         // try to keep queue size managable during restart
  301.         int limit = throttleStartThreshold;
  302.         // track whether any throttles occurred
  303.         boolean throttled = false;

  304.         while (true) {
  305.             final Integer size = getMaxQueueSize();
  306.             if (size == null || size <= limit) {
  307.                 // within limit
  308.                 if (throttled) {
  309.                     LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")");
  310.                 }
  311.                 break;
  312.             }

  313.             throttled = true;
  314.             LOGGER.info("[" + getName() + "]"
  315.                     + " queueing throttled until below "
  316.                     + throttleStopThreshold
  317.                     + " ("
  318.                             + "size=" + size
  319.                             + ", remaining=" + (remaining == null ? "?" : remaining)
  320.                     + ")");
  321.             // too many messages queued
  322.             // set limit to stop threshold
  323.             limit = throttleStopThreshold;
  324.             // wait for listener to do some processing
  325.             // 5s is a little low, but don't want to wait too long
  326.             Thread.sleep(throttleWaitInterval);
  327.         }
  328.     }

  329.     /**
  330.      * NOTE: messing with the executors map is not a good idea.
  331.      *
  332.      * @return the map of listeners and their executors.
  333.      */
  334.     public Map<NotificationListener, ExecutorService> getExecutors() {
  335.         return notificationListeners;
  336.     }

  337.     /** @return int throttle start threshold */
  338.     public int getThrottleStartThreshold() { return this.throttleStartThreshold; }
  339.     /** @param n int throttle start threshold */
  340.     public void setThrottleStartThreshold(final int n) { this.throttleStartThreshold = n; }

  341.     /** @return int throttle stop threshold */
  342.     public int getThrottleStopThreshold() { return this.throttleStopThreshold; }
  343.     /** @param n int throttle stop threshold */
  344.     public void setThrottleStopThreshold(final int n) { this.throttleStopThreshold = n; }

  345.     /** @return int throttle wait interval */
  346.     public long getThrottleWaitInterval() { return this.throttleWaitInterval; }
  347.     /** @param ms long throttle wait interval in ms */
  348.     public void setThrottleWaitInterval(final long ms) { this.throttleWaitInterval = ms; }

  349. }