ExecutorListenerNotifier.java
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.aws.JsonNotificationIndex;
import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.ExecutorTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ExecutorListenerNotifier extends DefaultConfigurable implements
ListenerNotifier {
private static final Logger LOGGER = Logger
.getLogger(ExecutorListenerNotifier.class.getName());
private static ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();
private DefaultNotificationReceiver receiver;
/**
* Notification listeners registered to receive notifications, and an
* ExecutorService that delivers Notifications to each in a separate thread.
*/
protected Map<NotificationListener, ExecutorService> notificationListeners = new HashMap<NotificationListener, ExecutorService>();
/**
* Make sure listener will accept notification before queueing it for
* processing.
*/
protected boolean acceptBeforeQueuing = true;
/**
* Timer used to retry tasks when they fail and listeners have configured
* retryDelay.
*/
protected Timer retryTimer = new Timer();
/** When queue size reaches this level, start throttling */
protected int throttleStartThreshold = 50000;
/** When queue size reaches this level, stop throttling */
protected int throttleStopThreshold = 25000;
/** When throttling, wait this many milliseconds between queue size checks. */
protected long throttleWaitInterval = 5000L;
/**
* Constructor
* @param receiver DefaultNotificationReceiver
*/
public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) {
this.receiver = receiver;
}
/**
* Add a new notification listener.
*
* @param listener
* the listener to add. When notifications are received, this
* listener will be notified.
*/
@Override
public void addNotificationListener(NotificationListener listener)
throws Exception {
if (!notificationListeners.containsKey(listener)) {
// fixed thread pool allows us to inspect the queue length...
int concurrentProducts = 1;
if (listener instanceof DefaultNotificationListener) {
concurrentProducts =
((DefaultNotificationListener) listener).getConcurrentProducts();
}
ExecutorService listenerExecutor = Executors.newFixedThreadPool(
concurrentProducts);
notificationListeners.put(listener, listenerExecutor);
}
}
/**
* Remove an existing notification listener.
*
* Any currently queued notifications are processed before shutting down.
*
* @param listener
* the listener to remove. When notifications are receive, this
* listener will no longer be notified.
*/
@Override
public void removeNotificationListener(NotificationListener listener)
throws Exception {
// remove listener from map
ExecutorService listenerExecutor = notificationListeners
.remove(listener);
// shutdown executor thread
listenerExecutor.shutdown();
// Could use shutdownNow() instead?
// however, shutdown() gives all listeners a chance to
// process all notifications, but may keep client from shutting down
// quickly. Also, see DefaultNotificationReceiver.shutdown().
}
/**
* Send a notification to all registered NotificationListeners.
*
* Creates a NotificationEvent, with a reference to this object and calls
* each notificationListeners onNotification method in separate threads.
*
* This method usually returns before registered NotificationListeners have
* completed processing a notification.
*
* @param event
* the notification being sent to listeners.
* @throws Exception if error occurs
*/
@Override
public void notifyListeners(final NotificationEvent event) throws Exception {
this.notifyListeners(event, this.notificationListeners.keySet());
}
/**
* Calls queueNotification with event and listener for each listener
* @param event NotificationEvent
* @param listeners Collection of NotificationListeners
* @throws Exception if error occurs
*/
public void notifyListeners(final NotificationEvent event,
final Collection<NotificationListener> listeners) throws Exception {
Iterator<NotificationListener> iter = listeners.iterator();
while (iter.hasNext()) {
NotificationListener listener = iter.next();
// only requeue for default notification listeners
queueNotification(listener, event);
}
}
/**
* @param listener NotificationListener
* @param event NotificationEvent
*/
protected void queueNotification(final NotificationListener listener,
final NotificationEvent event) {
if (acceptBeforeQueuing
&& listener instanceof DefaultNotificationListener) {
DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
if (!defaultListener.accept(event.getNotification().getProductId())) {
return;
}
}
// determine retry delay
long retryDelay = 0L;
if (listener instanceof AbstractListener) {
retryDelay = ((AbstractListener) listener).getRetryDelay();
}
ExecutorService listenerExecutor = notificationListeners.get(listener);
ExecutorTask<Void> listenerTask = new ExecutorTask<Void>(
listenerExecutor, listener.getMaxTries(),
listener.getTimeout(), new NotificationListenerCallable(
listener, event), retryTimer, retryDelay);
listenerExecutor.submit(listenerTask);
// log how many notifications are pending
if (listenerExecutor instanceof ThreadPoolExecutor) {
BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor)
.getQueue();
LOGGER.fine("[" + event.getNotificationReceiver().getName()
+ "] listener (" + listener.getName() + ") has "
+ pending.size() + " queued notifications");
}
}
@Override
public void shutdown() throws Exception {
// remove all listeners
Iterator<NotificationListener> iter = new ArrayList<NotificationListener>(
notificationListeners.keySet()).iterator();
while (iter.hasNext()) {
removeNotificationListener(iter.next());
}
}
@Override
public void startup() throws Exception {
super.startup();
NotificationIndex index = receiver.getNotificationIndex();
// filter down to listeners who can handle requeueing gracefully
ArrayList<NotificationListener> gracefulListeners = new ArrayList<NotificationListener>();
Iterator<NotificationListener> iter = this.notificationListeners
.keySet().iterator();
while (iter.hasNext()) {
NotificationListener listener = iter.next();
// make sure each index only notifies each listener once
String key = listener.getName() + '|' + index.getName();
if (AUTOLOADED_INDEXES.contains(key)) {
// already loaded this notification index for this listener
// another receiver is sharing this notification index
} else if (listener instanceof DefaultNotificationListener
&& ((DefaultNotificationListener) listener)
.getNotificationIndex() != null) {
gracefulListeners.add(listener);
AUTOLOADED_INDEXES.add(key);
}
}
if (gracefulListeners.size() == 0) {
// don't bother searching if nobody is listening
return;
}
LOGGER.info("[" + receiver.getName()
+ "] requeueing notification index '" + index.getName() + "'");
// find all existing notifications
List<Notification> allNotifications = null;
// for json index, push intersection into database if only one listener
if (index instanceof JsonNotificationIndex && gracefulListeners.size() == 1) {
NotificationIndex listenerIndex =
((DefaultNotificationListener) gracefulListeners.get(0))
.getNotificationIndex();
if (listenerIndex instanceof JsonNotificationIndex
&& !((JsonNotificationIndex) listenerIndex).getDriver().contains("sqlite")
) {
// get intersection when potentially sharing database
try {
allNotifications =
((JsonNotificationIndex) index).getMissingNotifications(
((JsonNotificationIndex) listenerIndex).getTable());
} catch (Exception e) {
LOGGER.log(Level.INFO, "Exception loading intersection, continuing", e);
}
}
}
if (allNotifications == null) {
// fallback to previous behavior
allNotifications = index.findNotifications(
(List<String>) null, (List<String>) null, (List<String>) null);
}
LOGGER.info("Done finding existing notifications");
// queue them for processing in case they were previous missed
Date now = new Date();
int count = 0;
for (final Notification notification : allNotifications) {
NotificationEvent event = new NotificationEvent(receiver, notification);
count += 1;
if (event.getNotification().getExpirationDate().after(now)) {
// still valid
this.notifyListeners(event, gracefulListeners);
}
// try to keep queue size managable during restart
throttleQueues(allNotifications.size() - count);
}
LOGGER.info("All notifications queued");
// keep track that we've processed this notification index
AUTOLOADED_INDEXES.add(index.getName());
}
/** @return default notification receiver */
public DefaultNotificationReceiver getReceiver() {
return receiver;
}
/** @param receiver of the default notification variety */
public void setReceiver(DefaultNotificationReceiver receiver) {
this.receiver = receiver;
}
/** @return map of status */
public Map<String, Integer> getStatus() {
HashMap<String, Integer> status = new HashMap<String, Integer>();
for (final NotificationListener listener : notificationListeners.keySet()) {
ExecutorService listenerExecutor = notificationListeners.get(listener);
if (listenerExecutor instanceof ThreadPoolExecutor) {
// check how many notifications are pending
int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
status.put(receiver.getName() + " - " + listener.getName(), size);
}
}
return status;
}
/**
* Check queue status and return length of longest queue.
*
* @return length of longest queue, or null if no queue lengths.
*/
public Integer getMaxQueueSize() {
Integer maxSize = null;
for (final NotificationListener listener : notificationListeners.keySet()) {
ExecutorService listenerExecutor = notificationListeners.get(listener);
if (listenerExecutor instanceof ThreadPoolExecutor) {
// check how many notifications are pending
int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
if (maxSize == null || size > maxSize) {
maxSize = size;
}
}
}
return maxSize;
}
/**
* If longest queue has more than 50k notifications,
* wait until longest queue has 25k notifications before returning.
*
* @throws InterruptedException if error occurs
*/
public void throttleQueues() throws InterruptedException {
throttleQueues(null);
}
/**
* If longest queue has more than 50k notifications,
* wait until longest queue has 25k notifications before returning.
*
* @param remaining integer
* @throws InterruptedException if error occurs
*/
public void throttleQueues(Integer remaining) throws InterruptedException {
// try to keep queue size managable during restart
int limit = throttleStartThreshold;
// track whether any throttles occurred
boolean throttled = false;
while (true) {
final Integer size = getMaxQueueSize();
if (size == null || size <= limit) {
// within limit
if (throttled) {
LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")");
}
break;
}
throttled = true;
LOGGER.info("[" + getName() + "]"
+ " queueing throttled until below "
+ throttleStopThreshold
+ " ("
+ "size=" + size
+ ", remaining=" + (remaining == null ? "?" : remaining)
+ ")");
// too many messages queued
// set limit to stop threshold
limit = throttleStopThreshold;
// wait for listener to do some processing
// 5s is a little low, but don't want to wait too long
Thread.sleep(throttleWaitInterval);
}
}
/**
* NOTE: messing with the executors map is not a good idea.
*
* @return the map of listeners and their executors.
*/
public Map<NotificationListener, ExecutorService> getExecutors() {
return notificationListeners;
}
/** @return int throttle start threshold */
public int getThrottleStartThreshold() { return this.throttleStartThreshold; }
/** @param n int throttle start threshold */
public void setThrottleStartThreshold(final int n) { this.throttleStartThreshold = n; }
/** @return int throttle stop threshold */
public int getThrottleStopThreshold() { return this.throttleStopThreshold; }
/** @param n int throttle stop threshold */
public void setThrottleStopThreshold(final int n) { this.throttleStopThreshold = n; }
/** @return int throttle wait interval */
public long getThrottleWaitInterval() { return this.throttleWaitInterval; }
/** @param ms long throttle wait interval in ms */
public void setThrottleWaitInterval(final long ms) { this.throttleWaitInterval = ms; }
}