ExecutorListenerNotifier.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.aws.JsonNotificationIndex;
- import gov.usgs.earthquake.product.AbstractListener;
- import gov.usgs.util.DefaultConfigurable;
- import gov.usgs.util.ExecutorTask;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Timer;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- public class ExecutorListenerNotifier extends DefaultConfigurable implements
- ListenerNotifier {
- private static final Logger LOGGER = Logger
- .getLogger(ExecutorListenerNotifier.class.getName());
- private static ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();
- private DefaultNotificationReceiver receiver;
- /**
- * Notification listeners registered to receive notifications, and an
- * ExecutorService that delivers Notifications to each in a separate thread.
- */
- protected Map<NotificationListener, ExecutorService> notificationListeners = new HashMap<NotificationListener, ExecutorService>();
- /**
- * Make sure listener will accept notification before queueing it for
- * processing.
- */
- protected boolean acceptBeforeQueuing = true;
- /**
- * Timer used to retry tasks when they fail and listeners have configured
- * retryDelay.
- */
- protected Timer retryTimer = new Timer();
- /** When queue size reaches this level, start throttling */
- protected int throttleStartThreshold = 50000;
- /** When queue size reaches this level, stop throttling */
- protected int throttleStopThreshold = 25000;
- /** When throttling, wait this many milliseconds between queue size checks. */
- protected long throttleWaitInterval = 5000L;
- /**
- * Constructor
- * @param receiver DefaultNotificationReceiver
- */
- public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) {
- this.receiver = receiver;
- }
- /**
- * Add a new notification listener.
- *
- * @param listener
- * the listener to add. When notifications are received, this
- * listener will be notified.
- */
- @Override
- public void addNotificationListener(NotificationListener listener)
- throws Exception {
- if (!notificationListeners.containsKey(listener)) {
- // fixed thread pool allows us to inspect the queue length...
- int concurrentProducts = 1;
- if (listener instanceof DefaultNotificationListener) {
- concurrentProducts =
- ((DefaultNotificationListener) listener).getConcurrentProducts();
- }
- ExecutorService listenerExecutor = Executors.newFixedThreadPool(
- concurrentProducts);
- notificationListeners.put(listener, listenerExecutor);
- }
- }
- /**
- * Remove an existing notification listener.
- *
- * Any currently queued notifications are processed before shutting down.
- *
- * @param listener
- * the listener to remove. When notifications are receive, this
- * listener will no longer be notified.
- */
- @Override
- public void removeNotificationListener(NotificationListener listener)
- throws Exception {
- // remove listener from map
- ExecutorService listenerExecutor = notificationListeners
- .remove(listener);
- // shutdown executor thread
- listenerExecutor.shutdown();
- // Could use shutdownNow() instead?
- // however, shutdown() gives all listeners a chance to
- // process all notifications, but may keep client from shutting down
- // quickly. Also, see DefaultNotificationReceiver.shutdown().
- }
- /**
- * Send a notification to all registered NotificationListeners.
- *
- * Creates a NotificationEvent, with a reference to this object and calls
- * each notificationListeners onNotification method in separate threads.
- *
- * This method usually returns before registered NotificationListeners have
- * completed processing a notification.
- *
- * @param event
- * the notification being sent to listeners.
- * @throws Exception if error occurs
- */
- @Override
- public void notifyListeners(final NotificationEvent event) throws Exception {
- this.notifyListeners(event, this.notificationListeners.keySet());
- }
- /**
- * Calls queueNotification with event and listener for each listener
- * @param event NotificationEvent
- * @param listeners Collection of NotificationListeners
- * @throws Exception if error occurs
- */
- public void notifyListeners(final NotificationEvent event,
- final Collection<NotificationListener> listeners) throws Exception {
- Iterator<NotificationListener> iter = listeners.iterator();
- while (iter.hasNext()) {
- NotificationListener listener = iter.next();
- // only requeue for default notification listeners
- queueNotification(listener, event);
- }
- }
- /**
- * @param listener NotificationListener
- * @param event NotificationEvent
- */
- protected void queueNotification(final NotificationListener listener,
- final NotificationEvent event) {
- if (acceptBeforeQueuing
- && listener instanceof DefaultNotificationListener) {
- DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
- if (!defaultListener.accept(event.getNotification().getProductId())) {
- return;
- }
- }
- // determine retry delay
- long retryDelay = 0L;
- if (listener instanceof AbstractListener) {
- retryDelay = ((AbstractListener) listener).getRetryDelay();
- }
- ExecutorService listenerExecutor = notificationListeners.get(listener);
- ExecutorTask<Void> listenerTask = new ExecutorTask<Void>(
- listenerExecutor, listener.getMaxTries(),
- listener.getTimeout(), new NotificationListenerCallable(
- listener, event), retryTimer, retryDelay);
- listenerExecutor.submit(listenerTask);
- // log how many notifications are pending
- if (listenerExecutor instanceof ThreadPoolExecutor) {
- BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor)
- .getQueue();
- LOGGER.fine("[" + event.getNotificationReceiver().getName()
- + "] listener (" + listener.getName() + ") has "
- + pending.size() + " queued notifications");
- }
- }
- @Override
- public void shutdown() throws Exception {
- // remove all listeners
- Iterator<NotificationListener> iter = new ArrayList<NotificationListener>(
- notificationListeners.keySet()).iterator();
- while (iter.hasNext()) {
- removeNotificationListener(iter.next());
- }
- }
- @Override
- public void startup() throws Exception {
- super.startup();
- NotificationIndex index = receiver.getNotificationIndex();
- // filter down to listeners who can handle requeueing gracefully
- ArrayList<NotificationListener> gracefulListeners = new ArrayList<NotificationListener>();
- Iterator<NotificationListener> iter = this.notificationListeners
- .keySet().iterator();
- while (iter.hasNext()) {
- NotificationListener listener = iter.next();
- // make sure each index only notifies each listener once
- String key = listener.getName() + '|' + index.getName();
- if (AUTOLOADED_INDEXES.contains(key)) {
- // already loaded this notification index for this listener
- // another receiver is sharing this notification index
- } else if (listener instanceof DefaultNotificationListener
- && ((DefaultNotificationListener) listener)
- .getNotificationIndex() != null) {
- gracefulListeners.add(listener);
- AUTOLOADED_INDEXES.add(key);
- }
- }
- if (gracefulListeners.size() == 0) {
- // don't bother searching if nobody is listening
- return;
- }
- LOGGER.info("[" + receiver.getName()
- + "] requeueing notification index '" + index.getName() + "'");
- // find all existing notifications
- List<Notification> allNotifications = null;
- // for json index, push intersection into database if only one listener
- if (index instanceof JsonNotificationIndex && gracefulListeners.size() == 1) {
- NotificationIndex listenerIndex =
- ((DefaultNotificationListener) gracefulListeners.get(0))
- .getNotificationIndex();
- if (listenerIndex instanceof JsonNotificationIndex
- && !((JsonNotificationIndex) listenerIndex).getDriver().contains("sqlite")
- ) {
- // get intersection when potentially sharing database
- try {
- allNotifications =
- ((JsonNotificationIndex) index).getMissingNotifications(
- ((JsonNotificationIndex) listenerIndex).getTable());
- } catch (Exception e) {
- LOGGER.log(Level.INFO, "Exception loading intersection, continuing", e);
- }
- }
- }
- if (allNotifications == null) {
- // fallback to previous behavior
- allNotifications = index.findNotifications(
- (List<String>) null, (List<String>) null, (List<String>) null);
- }
- LOGGER.info("Done finding existing notifications");
- // queue them for processing in case they were previous missed
- Date now = new Date();
- int count = 0;
- for (final Notification notification : allNotifications) {
- NotificationEvent event = new NotificationEvent(receiver, notification);
- count += 1;
- if (event.getNotification().getExpirationDate().after(now)) {
- // still valid
- this.notifyListeners(event, gracefulListeners);
- }
- // try to keep queue size managable during restart
- throttleQueues(allNotifications.size() - count);
- }
- LOGGER.info("All notifications queued");
- // keep track that we've processed this notification index
- AUTOLOADED_INDEXES.add(index.getName());
- }
- /** @return default notification receiver */
- public DefaultNotificationReceiver getReceiver() {
- return receiver;
- }
- /** @param receiver of the default notification variety */
- public void setReceiver(DefaultNotificationReceiver receiver) {
- this.receiver = receiver;
- }
- /** @return map of status */
- public Map<String, Integer> getStatus() {
- HashMap<String, Integer> status = new HashMap<String, Integer>();
- for (final NotificationListener listener : notificationListeners.keySet()) {
- ExecutorService listenerExecutor = notificationListeners.get(listener);
- if (listenerExecutor instanceof ThreadPoolExecutor) {
- // check how many notifications are pending
- int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
- status.put(receiver.getName() + " - " + listener.getName(), size);
- }
- }
- return status;
- }
- /**
- * Check queue status and return length of longest queue.
- *
- * @return length of longest queue, or null if no queue lengths.
- */
- public Integer getMaxQueueSize() {
- Integer maxSize = null;
- for (final NotificationListener listener : notificationListeners.keySet()) {
- ExecutorService listenerExecutor = notificationListeners.get(listener);
- if (listenerExecutor instanceof ThreadPoolExecutor) {
- // check how many notifications are pending
- int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
- if (maxSize == null || size > maxSize) {
- maxSize = size;
- }
- }
- }
- return maxSize;
- }
- /**
- * If longest queue has more than 50k notifications,
- * wait until longest queue has 25k notifications before returning.
- *
- * @throws InterruptedException if error occurs
- */
- public void throttleQueues() throws InterruptedException {
- throttleQueues(null);
- }
- /**
- * If longest queue has more than 50k notifications,
- * wait until longest queue has 25k notifications before returning.
- *
- * @param remaining integer
- * @throws InterruptedException if error occurs
- */
- public void throttleQueues(Integer remaining) throws InterruptedException {
- // try to keep queue size managable during restart
- int limit = throttleStartThreshold;
- // track whether any throttles occurred
- boolean throttled = false;
- while (true) {
- final Integer size = getMaxQueueSize();
- if (size == null || size <= limit) {
- // within limit
- if (throttled) {
- LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")");
- }
- break;
- }
- throttled = true;
- LOGGER.info("[" + getName() + "]"
- + " queueing throttled until below "
- + throttleStopThreshold
- + " ("
- + "size=" + size
- + ", remaining=" + (remaining == null ? "?" : remaining)
- + ")");
- // too many messages queued
- // set limit to stop threshold
- limit = throttleStopThreshold;
- // wait for listener to do some processing
- // 5s is a little low, but don't want to wait too long
- Thread.sleep(throttleWaitInterval);
- }
- }
- /**
- * NOTE: messing with the executors map is not a good idea.
- *
- * @return the map of listeners and their executors.
- */
- public Map<NotificationListener, ExecutorService> getExecutors() {
- return notificationListeners;
- }
- /** @return int throttle start threshold */
- public int getThrottleStartThreshold() { return this.throttleStartThreshold; }
- /** @param n int throttle start threshold */
- public void setThrottleStartThreshold(final int n) { this.throttleStartThreshold = n; }
- /** @return int throttle stop threshold */
- public int getThrottleStopThreshold() { return this.throttleStopThreshold; }
- /** @param n int throttle stop threshold */
- public void setThrottleStopThreshold(final int n) { this.throttleStopThreshold = n; }
- /** @return int throttle wait interval */
- public long getThrottleWaitInterval() { return this.throttleWaitInterval; }
- /** @param ms long throttle wait interval in ms */
- public void setThrottleWaitInterval(final long ms) { this.throttleWaitInterval = ms; }
- }