NotificationIndexCleanup.java

  1. package gov.usgs.earthquake.distribution;

  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.logging.Level;
  5. import java.util.logging.Logger;

  6. /**
  7.  * NotificationIndexCleanup manages cleaning up expired notifications.
  8.  *
  9.  * Uses background thread to remove expired notifications while they exist,
  10.  * then uses wait/notify to pause until shutdown() or wakeUp() methods are
  11.  * called.
  12.  *
  13.  * NOTE: this class does not schedule periodic cleanup, and the wakeUp() method
  14.  * must be called periodically.
  15.  */
  16. public class NotificationIndexCleanup implements Runnable {

  17.   private static final Logger LOGGER = Logger.getLogger(NotificationIndexCleanup.class.getName());

  18.   public final NotificationIndex index;

  19.   // listener that can take additional actions during cleanup
  20.   public final Listener listener;

  21.   // object used to synchronize state access between threads
  22.   public final Object syncObject = new Object();
  23.   // thread where cleanup loop runs
  24.   public Thread cleanupThread = null;
  25.   // whether thread should stop running
  26.   private boolean stopThread = false;

  27.   public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) {
  28.     this.index = index;
  29.     this.listener = listener;
  30.   }

  31.   /**
  32.    * Notification cleanup thread loop.
  33.    *
  34.    * This method blocks and should probably not be called by you.
  35.    */
  36.   public void run() {
  37.     final String indexName = this.index.getName();

  38.     LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup starting");
  39.     // run until thread stopped
  40.     while (!stopThread) {
  41.       List<Notification> expiredNotifications = null;
  42.       synchronized (syncObject) {
  43.         try {
  44.           expiredNotifications = this.index.findExpiredNotifications();
  45.         } catch (Exception e) {
  46.           LOGGER.log(Level.INFO, e, () -> "[" + indexName + "] exception finding expired notifications");
  47.         }
  48.         if (expiredNotifications == null || expiredNotifications.size() == 0) {
  49.           // Wait for expired notifications to process
  50.           try {
  51.             syncObject.wait();
  52.           } catch (InterruptedException ignore) {
  53.             // signal from another thread (stopThread checked above)
  54.             continue;
  55.           }
  56.         }
  57.       }

  58.       // remove batch of expired notifications
  59.       final List<Notification> removed = new ArrayList<>(expiredNotifications.size());
  60.       if (this.listener == null) {
  61.         removed.addAll(expiredNotifications);
  62.       } else {
  63.         // notify listener, remove only those successfully processed by listener
  64.         for (final Notification expired : expiredNotifications) {
  65.           synchronized (syncObject) {
  66.             if (stopThread) {
  67.               break;
  68.             }
  69.           }
  70.           try {
  71.             this.listener.onExpiredNotification(expired);
  72.             removed.add(expired);
  73.           } catch (Exception e) {
  74.             LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Listener exception processing expired notification");
  75.           }
  76.         }
  77.       }
  78.       try {
  79.         // remove in batch
  80.         this.index.removeNotifications(removed);
  81.         LOGGER.fine(() -> "[" + indexName + "] Removed " + removed.size() + " expired notifications");
  82.       } catch (Exception e) {
  83.         LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notifications");
  84.       }
  85.     }
  86.     LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting");
  87.     this.cleanupThread = null;
  88.   }

  89.   /**
  90.    * Start cleanup process.
  91.    *
  92.    * @throws Exception
  93.    */
  94.   public void startup() throws Exception {
  95.     synchronized (syncObject) {
  96.       if (this.cleanupThread != null) {
  97.         throw new IllegalStateException("Already started");
  98.       }
  99.       // start thread
  100.       stopThread = false;
  101.       this.cleanupThread = new Thread(this);
  102.     }
  103.     this.cleanupThread.start();
  104.   }

  105.   /**
  106.    * Stop cleanup process.
  107.    *
  108.    * @throws Exception
  109.    */
  110.   public void shutdown() throws Exception {
  111.     synchronized (syncObject) {
  112.       if (this.cleanupThread == null) {
  113.         throw new IllegalStateException("Already stopped");
  114.       }
  115.       // stop thread
  116.       stopThread = true;
  117.       this.cleanupThread.interrupt();
  118.     }
  119.     this.cleanupThread.join();
  120.   }

  121.   /**
  122.    * Wake up the background thread if it is waiting.
  123.    */
  124.   public void wakeUp() {
  125.     synchronized (syncObject) {
  126.       syncObject.notify();
  127.     }
  128.   }

  129.   /**
  130.    * Interface for cleanup listeners to take additional steps before a
  131.    * notification is removed.
  132.    */
  133.   public static interface Listener {
  134.     public void onExpiredNotification(final Notification expired) throws Exception;
  135.   }
  136. }