NotificationIndexCleanup.java
- package gov.usgs.earthquake.distribution;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * NotificationIndexCleanup manages cleaning up expired notifications.
- *
- * Uses background thread to remove expired notifications while they exist,
- * then uses wait/notify to pause until shutdown() or wakeUp() methods are
- * called.
- *
- * NOTE: this class does not schedule periodic cleanup, and the wakeUp() method
- * must be called periodically.
- */
- public class NotificationIndexCleanup implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(NotificationIndexCleanup.class.getName());
- public final NotificationIndex index;
- // listener that can take additional actions during cleanup
- public final Listener listener;
- // object used to synchronize state access between threads
- public final Object syncObject = new Object();
- // thread where cleanup loop runs
- public Thread cleanupThread = null;
- // whether thread should stop running
- private boolean stopThread = false;
- public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) {
- this.index = index;
- this.listener = listener;
- }
- /**
- * Notification cleanup thread loop.
- *
- * This method blocks and should probably not be called by you.
- */
- public void run() {
- final String indexName = this.index.getName();
- LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup starting");
- // run until thread stopped
- while (!stopThread) {
- List<Notification> expiredNotifications = null;
- synchronized (syncObject) {
- try {
- expiredNotifications = this.index.findExpiredNotifications();
- } catch (Exception e) {
- LOGGER.log(Level.INFO, e, () -> "[" + indexName + "] exception finding expired notifications");
- }
- if (expiredNotifications == null || expiredNotifications.size() == 0) {
- // Wait for expired notifications to process
- try {
- syncObject.wait();
- } catch (InterruptedException ignore) {
- // signal from another thread (stopThread checked above)
- continue;
- }
- }
- }
- // remove batch of expired notifications
- final List<Notification> removed = new ArrayList<>(expiredNotifications.size());
- if (this.listener == null) {
- removed.addAll(expiredNotifications);
- } else {
- // notify listener, remove only those successfully processed by listener
- for (final Notification expired : expiredNotifications) {
- synchronized (syncObject) {
- if (stopThread) {
- break;
- }
- }
- try {
- this.listener.onExpiredNotification(expired);
- removed.add(expired);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Listener exception processing expired notification");
- }
- }
- }
- try {
- // remove in batch
- this.index.removeNotifications(removed);
- LOGGER.fine(() -> "[" + indexName + "] Removed " + removed.size() + " expired notifications");
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notifications");
- }
- }
- LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting");
- this.cleanupThread = null;
- }
- /**
- * Start cleanup process.
- *
- * @throws Exception
- */
- public void startup() throws Exception {
- synchronized (syncObject) {
- if (this.cleanupThread != null) {
- throw new IllegalStateException("Already started");
- }
- // start thread
- stopThread = false;
- this.cleanupThread = new Thread(this);
- }
- this.cleanupThread.start();
- }
- /**
- * Stop cleanup process.
- *
- * @throws Exception
- */
- public void shutdown() throws Exception {
- synchronized (syncObject) {
- if (this.cleanupThread == null) {
- throw new IllegalStateException("Already stopped");
- }
- // stop thread
- stopThread = true;
- this.cleanupThread.interrupt();
- }
- this.cleanupThread.join();
- }
- /**
- * Wake up the background thread if it is waiting.
- */
- public void wakeUp() {
- synchronized (syncObject) {
- syncObject.notify();
- }
- }
- /**
- * Interface for cleanup listeners to take additional steps before a
- * notification is removed.
- */
- public static interface Listener {
- public void onExpiredNotification(final Notification expired) throws Exception;
- }
- }