DefaultNotificationListener.java
/*
* DefaultNotificationListener
*/
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.ObjectLock;
import gov.usgs.util.StringUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A base implementation of a notification listener. Implements functionality
* that is useful for most notification listeners.
*
* Sub classes should override the onProduct(Product) method to add custom
* processing.
*
* The DefaultNotificationListener extends the AbstractListener and can use any
* of those configuration parameters.
*
* @see gov.usgs.earthquake.product.AbstractListener
*/
public class DefaultNotificationListener extends AbstractListener implements
NotificationListener, NotificationIndexCleanup.Listener {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(DefaultNotificationListener.class.getName());
/** Property referencing a notification index config section. */
public static final String NOTIFICATION_INDEX_PROPERTY = "listenerIndex";
/** Property for listener index file */
public static final String INDEX_FILE_PROPERTY = "listenerIndexFile";
/** How long to wait until checking for expired notifications/products. */
public static final String CLEANUP_INTERVAL_PROPERTY = "cleanupInterval";
/** Default time to wait for cleanup. 1h */
public static final String DEFAULT_CLEANUP_INTERVAL = "3600000";
/** Property for concurrentProducts */
public static final String CONCURRENT_PRODUCTS_PROPERTY = "concurrentProducts";
/** How many products to process at a time. */
public static final String DEFAULT_CONCURRENT_PRODUCTS = "1";
/** Whether or not to process products more than once. */
public static final String PROCESS_DUPLICATES = "processDuplicates";
/** Default for process duplicates. False */
public static final String DEFAULT_PROCESS_DUPLICATES = "false";
/** Filter products based on content paths they contain. */
public static final String INCLUDE_PATHS_PROPERTY = "includePaths";
/** Property for exludePaths */
public static final String EXCLUDE_PATHS_PROPERTY = "excludePaths";
/** Optional notification index. */
private NotificationIndex notificationIndex = null;
/** How often to run cleanup task, in ms, <=0 = off. */
private Long cleanupInterval = 0L;
/** Timer that schedules sender cleanup task. */
private Timer cleanupTimer = null;
/** Notification index cleanup. */
private NotificationIndexCleanup notificationCleanup = null;
/** How many products to process at the same time. */
private int concurrentProducts = 1;
/** Whether or not to process products that have already been processed. */
private boolean processDuplicates = false;
/** Array of content paths to search. */
private final ArrayList<String> includePaths = new ArrayList<String>();
/** Array of content paths to search. */
private final ArrayList<String> excludePaths = new ArrayList<String>();
/**
* Locks used to keep concurrent listener from processing product
* more than once at the same time.
*/
private ObjectLock<ProductId> storageLocks = new ObjectLock<ProductId>();
/**
* Implement the NotificationListener interface.
*
* This method calls accept, and if accept returns true, retrieves the
* product and calls onProduct.
*/
public void onNotification(final NotificationEvent event) throws Exception {
Notification notification = event.getNotification();
ProductId id = notification.getProductId();
String productId = id.toString();
LOGGER.finest("[" + getName() + "] received notification for id="
+ productId);
if (!accept(id)) {
LOGGER.finest("[" + getName() + "] rejected notification for id="
+ productId);
return;
}
// only allow one thread to process a product
storageLocks.acquireLock(id);
try {
if (!onBeforeProcessNotification(notification)) {
return;
}
LOGGER.finer("[" + getName() + "] processing notification for id="
+ productId);
Product product = event.getProduct();
if (product == null) {
throw new ContinuableListenerException("retrieved product null,"
+ " notification id=" + productId);
}
if (!onBeforeProcessProduct(product)) {
return;
}
LOGGER.finer("[" + getName() + "] processing product for id="
+ productId);
onProduct(product);
onAfterProcessNotification(notification);
} finally {
// be sure to release lock when done/error
storageLocks.releaseLock(id);
}
}
/**
* Called by onNotification when a product is retrieved.
*
* @param product
* a product whose notification was accepted.
* @throws Exception if error occurs
*/
public void onProduct(final Product product) throws Exception {
// subclasses do stuff here
ProductId id = product.getId();
StringBuffer b = new StringBuffer("[" + getName()
+ "] product processed source=" + id.getSource() + ", type="
+ id.getType() + ", code=" + id.getCode() + ", updateTime="
+ id.getUpdateTime().toString());
Map<String, String> properties = product.getProperties();
Iterator<String> iter = properties.keySet().iterator();
while (iter.hasNext()) {
String name = iter.next();
b.append(", ").append(name).append("=")
.append(properties.get(name));
}
LOGGER.info(b.toString());
System.out.println(b.toString());
}
/**
* Called just before this listener processes a notification.
*
* @param notification
* notification about to be processed.
* @return true to process the notification, false to skip
* @throws Exception if error occurs
*/
protected boolean onBeforeProcessNotification(
final Notification notification) throws Exception {
// only check if we care
if (!processDuplicates && this.notificationIndex != null) {
List<Notification> notifications = this.notificationIndex
.findNotifications(notification.getProductId());
if (notifications.size() > 0) {
LOGGER.finer("[" + getName()
+ "] skipping existing product "
+ notification.getProductId().toString());
return false;
}
}
return true;
}
/**
* Called after a product has been downloaded, but before onProduct is
* called.
*
* Sometimes a listener cannot tell whether it should process a product
* until its contents are available. This is where the "includePaths" and
* "excludePaths" are evaluated.
*
* @param product
* product about to be processed.
* @return true to process the product, false to skip
* @throws Exception if error occurs
*/
protected boolean onBeforeProcessProduct(final Product product)
throws Exception {
if (excludePaths.size() != 0) {
Map<String, Content> contents = product.getContents();
Iterator<String> pathIter = excludePaths.iterator();
while (pathIter.hasNext()) {
String path = pathIter.next();
if (contents.containsKey(path)) {
// contains at least one matching include path
LOGGER.fine("[" + getName() + "] skipping product "
+ product.getId().toString() + ", '" + path
+ "' matches excludePaths");
return false;
}
}
}
if (includePaths.size() != 0) {
boolean containsPath = false;
Map<String, Content> contents = product.getContents();
Iterator<String> pathIter = includePaths.iterator();
while (pathIter.hasNext()) {
String path = pathIter.next();
if (contents.containsKey(path)) {
// contains at least one matching include path
containsPath = true;
break;
}
}
if (!containsPath) {
LOGGER.fine("[" + getName() + "] skipping product "
+ product.getId().toString()
+ ", does not match includePaths");
return false;
}
}
return true;
}
/**
* Called when this listener has successfully processed a notification.
*
* @param notification
* notification that was processed.
* @throws Exception if error occurs
*/
protected void onAfterProcessNotification(final Notification notification)
throws Exception {
if (this.notificationIndex != null) {
this.notificationIndex.addNotification(notification);
}
}
/**
* Called when an expired notification is being removed from the index.
*
* @param notification to be removed
* @throws Exception if error occurs
*/
@Override
public void onExpiredNotification(final Notification notification)
throws Exception {
// nothing to do
}
/**
* Periodic cleanup task.
*
* Called every cleanupInterval milliseconds.
*/
public void cleanup() throws Exception {
LOGGER.finer("[" + getName() + "] running listener cleanup");
if (this.notificationCleanup == null) {
this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
this.notificationCleanup.startup();
} else {
this.notificationCleanup.wakeUp();
}
}
@Override
public void startup() throws Exception {
super.startup();
if (this.notificationIndex != null) {
this.notificationIndex.startup();
}
// only schedule cleanup if interval is non-zero
if (cleanupInterval > 0) {
cleanupTimer = new Timer();
cleanupTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
cleanup();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName()
+ "] exception during cleanup", e);
}
}
}, 0, cleanupInterval);
}
}
@Override
public void shutdown() throws Exception {
super.shutdown();
if (this.notificationCleanup != null) {
try {
this.notificationCleanup.shutdown();
} catch (Exception e) {
LOGGER.log(Level.INFO, "[" + getName() + "] exception stopping notification cleanup", e);
} finally {
this.notificationCleanup = null;
}
}
try {
this.notificationIndex.shutdown();
} catch (Exception e) {
// ignore
}
try {
this.cleanupTimer.cancel();
} catch (Exception e) {
// ignore
}
}
@Override
public void configure(final Config config) throws Exception {
super.configure(config);
String notificationIndexName = config
.getProperty(NOTIFICATION_INDEX_PROPERTY);
String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
if (notificationIndexName != null) {
LOGGER.config("[" + getName() + "] loading notification index '"
+ notificationIndexName + "'");
notificationIndex = (NotificationIndex) Config.getConfig()
.getObject(notificationIndexName);
if (notificationIndex == null) {
throw new ConfigurationException("[" + getName() + "] index '"
+ notificationIndexName
+ "' is not properly configured");
}
} else if (notificationIndexFile != null) {
LOGGER.config("[" + getName() + "] using notification index '"
+ notificationIndexFile + "'");
notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
}
cleanupInterval = Long.parseLong(config.getProperty(
CLEANUP_INTERVAL_PROPERTY, DEFAULT_CLEANUP_INTERVAL));
LOGGER.config("[" + getName() + "] cleanup interval = "
+ processDuplicates);
concurrentProducts = Integer.parseInt(config.getProperty(
CONCURRENT_PRODUCTS_PROPERTY, DEFAULT_CONCURRENT_PRODUCTS));
LOGGER.config("[" + getName() + "] concurrent products = "
+ concurrentProducts);
processDuplicates = Boolean.valueOf(config.getProperty(
PROCESS_DUPLICATES, DEFAULT_PROCESS_DUPLICATES));
LOGGER.config("[" + getName() + "] process duplicates = "
+ processDuplicates);
includePaths.addAll(StringUtils.split(
config.getProperty(INCLUDE_PATHS_PROPERTY), ","));
LOGGER.config("[" + getName() + "] include paths = " + includePaths);
excludePaths.addAll(StringUtils.split(
config.getProperty(EXCLUDE_PATHS_PROPERTY), ","));
LOGGER.config("[" + getName() + "] exclude paths = " + excludePaths);
}
/** @return notificationIndex */
public NotificationIndex getNotificationIndex() {
return notificationIndex;
}
/** @param notificationIndex to set */
public void setNotificationIndex(NotificationIndex notificationIndex) {
this.notificationIndex = notificationIndex;
}
/** @return cleanupInterval */
public Long getCleanupInterval() {
return cleanupInterval;
}
/** @param cleanupInterval long to set */
public void setCleanupInterval(Long cleanupInterval) {
this.cleanupInterval = cleanupInterval;
}
/** @return concurrentProducts */
public int getConcurrentProducts() {
return concurrentProducts;
}
/** @param concurrentProducts int to set */
public void setConcurrentProducts(int concurrentProducts) {
this.concurrentProducts = concurrentProducts;
}
/** @return processDuplicates */
public boolean isProcessDuplicates() {
return processDuplicates;
}
/** @param processDuplicates boolean to set */
public void setProcessDuplicates(boolean processDuplicates) {
this.processDuplicates = processDuplicates;
}
/**
* @return the includePaths
*/
public ArrayList<String> getIncludePaths() {
return includePaths;
}
/**
* @return the excludePaths
*/
public ArrayList<String> getExcludePaths() {
return excludePaths;
}
}