FutureListenerNotifier.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.earthquake.product.AbstractListener;
  3. import gov.usgs.util.FutureExecutorTask;

  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. import java.util.logging.Logger;

  9. /**
  10.  * FutureListenerNotifier is similar to ExecutorListenerNotifier, but uses
  11.  * Futures with an ExecutorService to implement timeouts instead of Timers.
  12.  *
  13.  * backgroundService is an unbounded executor, but will execute only as many
  14.  * threads are allowed by listener executors since listener executors submit
  15.  * tasks to the backgroundService and wait on the future.
  16.  *
  17.  * This ends up being more efficient because the threads where jobs execute are
  18.  * cached, instead of a new Timer thread created for each task.
  19.  */
  20. public class FutureListenerNotifier extends ExecutorListenerNotifier {

  21.   private static final Logger LOGGER = Logger
  22.       .getLogger(FutureListenerNotifier.class.getName());

  23.   /** Service where tasks execute using futures for timeouts. */
  24.   private ExecutorService backgroundService;

  25.   /**
  26.    * Constructor
  27.    * @param receiver DefaultNotificationReceiver
  28.    */
  29.   public FutureListenerNotifier(final DefaultNotificationReceiver receiver) {
  30.     super(receiver);
  31.   }

  32.   @Override
  33.   protected void queueNotification(final NotificationListener listener,
  34.       final NotificationEvent event) {
  35.     if (acceptBeforeQueuing
  36.         && listener instanceof DefaultNotificationListener) {
  37.       DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
  38.       if (!defaultListener.accept(event.getNotification().getProductId())) {
  39.         return;
  40.       }
  41.     }

  42.     // determine retry delay
  43.     long retryDelay = 0L;
  44.     if (listener instanceof AbstractListener) {
  45.       retryDelay = ((AbstractListener) listener).getRetryDelay();
  46.     }

  47.     ExecutorService listenerExecutor = notificationListeners.get(listener);
  48.     FutureExecutorTask<Void> listenerTask = new FutureExecutorTask<Void>(
  49.         backgroundService, listenerExecutor, listener.getMaxTries(),
  50.         listener.getTimeout(), new NotificationListenerCallable(
  51.             listener, event), retryTimer, retryDelay);
  52.     listenerExecutor.submit(listenerTask);

  53.     // log how many notifications are pending
  54.     if (listenerExecutor instanceof ThreadPoolExecutor) {
  55.       BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor)
  56.           .getQueue();
  57.       LOGGER.fine("[" + event.getNotificationReceiver().getName()
  58.           + "] listener (" + listener.getName() + ") has "
  59.           + pending.size() + " queued notifications");
  60.     }
  61.   }

  62.   @Override
  63.   public void shutdown() throws Exception {
  64.     super.shutdown();
  65.     backgroundService.shutdown();
  66.     backgroundService = null;
  67.   }

  68.   @Override
  69.   public void startup() throws Exception {
  70.     backgroundService = Executors.newCachedThreadPool();
  71.     super.startup();
  72.   }

  73. }