FutureListenerNotifier.java
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.util.FutureExecutorTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Logger;
/**
* FutureListenerNotifier is similar to ExecutorListenerNotifier, but uses
* Futures with an ExecutorService to implement timeouts instead of Timers.
*
* backgroundService is an unbounded executor, but will execute only as many
* threads are allowed by listener executors since listener executors submit
* tasks to the backgroundService and wait on the future.
*
* This ends up being more efficient because the threads where jobs execute are
* cached, instead of a new Timer thread created for each task.
*/
public class FutureListenerNotifier extends ExecutorListenerNotifier {
private static final Logger LOGGER = Logger
.getLogger(FutureListenerNotifier.class.getName());
/** Service where tasks execute using futures for timeouts. */
private ExecutorService backgroundService;
/**
* Constructor
* @param receiver DefaultNotificationReceiver
*/
public FutureListenerNotifier(final DefaultNotificationReceiver receiver) {
super(receiver);
}
@Override
protected void queueNotification(final NotificationListener listener,
final NotificationEvent event) {
if (acceptBeforeQueuing
&& listener instanceof DefaultNotificationListener) {
DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
if (!defaultListener.accept(event.getNotification().getProductId())) {
return;
}
}
// determine retry delay
long retryDelay = 0L;
if (listener instanceof AbstractListener) {
retryDelay = ((AbstractListener) listener).getRetryDelay();
}
ExecutorService listenerExecutor = notificationListeners.get(listener);
FutureExecutorTask<Void> listenerTask = new FutureExecutorTask<Void>(
backgroundService, listenerExecutor, listener.getMaxTries(),
listener.getTimeout(), new NotificationListenerCallable(
listener, event), retryTimer, retryDelay);
listenerExecutor.submit(listenerTask);
// log how many notifications are pending
if (listenerExecutor instanceof ThreadPoolExecutor) {
BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor)
.getQueue();
LOGGER.fine("[" + event.getNotificationReceiver().getName()
+ "] listener (" + listener.getName() + ") has "
+ pending.size() + " queued notifications");
}
}
@Override
public void shutdown() throws Exception {
super.shutdown();
backgroundService.shutdown();
backgroundService = null;
}
@Override
public void startup() throws Exception {
backgroundService = Executors.newCachedThreadPool();
super.startup();
}
}