FutureListenerNotifier.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.AbstractListener;
- import gov.usgs.util.FutureExecutorTask;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.logging.Logger;
- /**
- * FutureListenerNotifier is similar to ExecutorListenerNotifier, but uses
- * Futures with an ExecutorService to implement timeouts instead of Timers.
- *
- * backgroundService is an unbounded executor, but will execute only as many
- * threads are allowed by listener executors since listener executors submit
- * tasks to the backgroundService and wait on the future.
- *
- * This ends up being more efficient because the threads where jobs execute are
- * cached, instead of a new Timer thread created for each task.
- */
- public class FutureListenerNotifier extends ExecutorListenerNotifier {
- private static final Logger LOGGER = Logger
- .getLogger(FutureListenerNotifier.class.getName());
- /** Service where tasks execute using futures for timeouts. */
- private ExecutorService backgroundService;
- /**
- * Constructor
- * @param receiver DefaultNotificationReceiver
- */
- public FutureListenerNotifier(final DefaultNotificationReceiver receiver) {
- super(receiver);
- }
- @Override
- protected void queueNotification(final NotificationListener listener,
- final NotificationEvent event) {
- if (acceptBeforeQueuing
- && listener instanceof DefaultNotificationListener) {
- DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
- if (!defaultListener.accept(event.getNotification().getProductId())) {
- return;
- }
- }
- // determine retry delay
- long retryDelay = 0L;
- if (listener instanceof AbstractListener) {
- retryDelay = ((AbstractListener) listener).getRetryDelay();
- }
- ExecutorService listenerExecutor = notificationListeners.get(listener);
- FutureExecutorTask<Void> listenerTask = new FutureExecutorTask<Void>(
- backgroundService, listenerExecutor, listener.getMaxTries(),
- listener.getTimeout(), new NotificationListenerCallable(
- listener, event), retryTimer, retryDelay);
- listenerExecutor.submit(listenerTask);
- // log how many notifications are pending
- if (listenerExecutor instanceof ThreadPoolExecutor) {
- BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor)
- .getQueue();
- LOGGER.fine("[" + event.getNotificationReceiver().getName()
- + "] listener (" + listener.getName() + ") has "
- + pending.size() + " queued notifications");
- }
- }
- @Override
- public void shutdown() throws Exception {
- super.shutdown();
- backgroundService.shutdown();
- backgroundService = null;
- }
- @Override
- public void startup() throws Exception {
- backgroundService = Executors.newCachedThreadPool();
- super.startup();
- }
- }