NotificationIndexCleanup.java
package gov.usgs.earthquake.distribution;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* NotificationIndexCleanup manages cleaning up expired notifications.
*
* Uses background thread to remove expired notifications while they exist,
* then uses wait/notify to pause until shutdown() or wakeUp() methods are
* called.
*
* NOTE: this class does not schedule periodic cleanup, and the wakeUp() method
* must be called periodically.
*/
public class NotificationIndexCleanup implements Runnable {
private static final Logger LOGGER = Logger.getLogger(NotificationIndexCleanup.class.getName());
public final NotificationIndex index;
// listener that can take additional actions during cleanup
public final Listener listener;
// object used to synchronize state access between threads
public final Object syncObject = new Object();
// thread where cleanup loop runs
public Thread cleanupThread = null;
// whether thread should stop running
private boolean stopThread = false;
public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) {
this.index = index;
this.listener = listener;
}
/**
* Notification cleanup thread loop.
*
* This method blocks and should probably not be called by you.
*/
public void run() {
final String indexName = this.index.getName();
LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup starting");
// run until thread stopped
while (!stopThread) {
List<Notification> expiredNotifications = null;
synchronized (syncObject) {
try {
expiredNotifications = this.index.findExpiredNotifications();
} catch (Exception e) {
LOGGER.log(Level.INFO, e, () -> "[" + indexName + "] exception finding expired notifications");
}
if (expiredNotifications == null || expiredNotifications.size() == 0) {
// Wait for expired notifications to process
try {
syncObject.wait();
} catch (InterruptedException ignore) {
// signal from another thread (stopThread checked above)
continue;
}
}
}
// remove batch of expired notifications
final List<Notification> removed = new ArrayList<>(expiredNotifications.size());
if (this.listener == null) {
removed.addAll(expiredNotifications);
} else {
// notify listener, remove only those successfully processed by listener
for (final Notification expired : expiredNotifications) {
synchronized (syncObject) {
if (stopThread) {
break;
}
}
try {
this.listener.onExpiredNotification(expired);
removed.add(expired);
} catch (Exception e) {
LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Listener exception processing expired notification");
}
}
}
try {
// remove in batch
this.index.removeNotifications(removed);
LOGGER.fine(() -> "[" + indexName + "] Removed " + removed.size() + " expired notifications");
} catch (Exception e) {
LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notifications");
}
}
LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting");
this.cleanupThread = null;
}
/**
* Start cleanup process.
*
* @throws Exception
*/
public void startup() throws Exception {
synchronized (syncObject) {
if (this.cleanupThread != null) {
throw new IllegalStateException("Already started");
}
// start thread
stopThread = false;
this.cleanupThread = new Thread(this);
}
this.cleanupThread.start();
}
/**
* Stop cleanup process.
*
* @throws Exception
*/
public void shutdown() throws Exception {
synchronized (syncObject) {
if (this.cleanupThread == null) {
throw new IllegalStateException("Already stopped");
}
// stop thread
stopThread = true;
this.cleanupThread.interrupt();
}
this.cleanupThread.join();
}
/**
* Wake up the background thread if it is waiting.
*/
public void wakeUp() {
synchronized (syncObject) {
syncObject.notify();
}
}
/**
* Interface for cleanup listeners to take additional steps before a
* notification is removed.
*/
public static interface Listener {
public void onExpiredNotification(final Notification expired) throws Exception;
}
}