DefaultNotificationListener.java
- /*
- * DefaultNotificationListener
- */
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.AbstractListener;
- import gov.usgs.earthquake.product.Content;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.util.Config;
- import gov.usgs.util.ObjectLock;
- import gov.usgs.util.StringUtils;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * A base implementation of a notification listener. Implements functionality
- * that is useful for most notification listeners.
- *
- * Sub classes should override the onProduct(Product) method to add custom
- * processing.
- *
- * The DefaultNotificationListener extends the AbstractListener and can use any
- * of those configuration parameters.
- *
- * @see gov.usgs.earthquake.product.AbstractListener
- */
- public class DefaultNotificationListener extends AbstractListener implements
- NotificationListener, NotificationIndexCleanup.Listener {
- /** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(DefaultNotificationListener.class.getName());
- /** Property referencing a notification index config section. */
- public static final String NOTIFICATION_INDEX_PROPERTY = "listenerIndex";
- /** Property for listener index file */
- public static final String INDEX_FILE_PROPERTY = "listenerIndexFile";
- /** How long to wait until checking for expired notifications/products. */
- public static final String CLEANUP_INTERVAL_PROPERTY = "cleanupInterval";
- /** Default time to wait for cleanup. 1h */
- public static final String DEFAULT_CLEANUP_INTERVAL = "3600000";
- /** Property for concurrentProducts */
- public static final String CONCURRENT_PRODUCTS_PROPERTY = "concurrentProducts";
- /** How many products to process at a time. */
- public static final String DEFAULT_CONCURRENT_PRODUCTS = "1";
- /** Whether or not to process products more than once. */
- public static final String PROCESS_DUPLICATES = "processDuplicates";
- /** Default for process duplicates. False */
- public static final String DEFAULT_PROCESS_DUPLICATES = "false";
- /** Filter products based on content paths they contain. */
- public static final String INCLUDE_PATHS_PROPERTY = "includePaths";
- /** Property for exludePaths */
- public static final String EXCLUDE_PATHS_PROPERTY = "excludePaths";
- /** Optional notification index. */
- private NotificationIndex notificationIndex = null;
- /** How often to run cleanup task, in ms, <=0 = off. */
- private Long cleanupInterval = 0L;
- /** Timer that schedules sender cleanup task. */
- private Timer cleanupTimer = null;
- /** Notification index cleanup. */
- private NotificationIndexCleanup notificationCleanup = null;
- /** How many products to process at the same time. */
- private int concurrentProducts = 1;
- /** Whether or not to process products that have already been processed. */
- private boolean processDuplicates = false;
- /** Array of content paths to search. */
- private final ArrayList<String> includePaths = new ArrayList<String>();
- /** Array of content paths to search. */
- private final ArrayList<String> excludePaths = new ArrayList<String>();
- /**
- * Locks used to keep concurrent listener from processing product
- * more than once at the same time.
- */
- private ObjectLock<ProductId> storageLocks = new ObjectLock<ProductId>();
- /**
- * Implement the NotificationListener interface.
- *
- * This method calls accept, and if accept returns true, retrieves the
- * product and calls onProduct.
- */
- public void onNotification(final NotificationEvent event) throws Exception {
- Notification notification = event.getNotification();
- ProductId id = notification.getProductId();
- String productId = id.toString();
- LOGGER.finest("[" + getName() + "] received notification for id="
- + productId);
- if (!accept(id)) {
- LOGGER.finest("[" + getName() + "] rejected notification for id="
- + productId);
- return;
- }
- // only allow one thread to process a product
- storageLocks.acquireLock(id);
- try {
- if (!onBeforeProcessNotification(notification)) {
- return;
- }
- LOGGER.finer("[" + getName() + "] processing notification for id="
- + productId);
- Product product = event.getProduct();
- if (product == null) {
- throw new ContinuableListenerException("retrieved product null,"
- + " notification id=" + productId);
- }
- if (!onBeforeProcessProduct(product)) {
- return;
- }
- LOGGER.finer("[" + getName() + "] processing product for id="
- + productId);
- onProduct(product);
- onAfterProcessNotification(notification);
- } finally {
- // be sure to release lock when done/error
- storageLocks.releaseLock(id);
- }
- }
- /**
- * Called by onNotification when a product is retrieved.
- *
- * @param product
- * a product whose notification was accepted.
- * @throws Exception if error occurs
- */
- public void onProduct(final Product product) throws Exception {
- // subclasses do stuff here
- ProductId id = product.getId();
- StringBuffer b = new StringBuffer("[" + getName()
- + "] product processed source=" + id.getSource() + ", type="
- + id.getType() + ", code=" + id.getCode() + ", updateTime="
- + id.getUpdateTime().toString());
- Map<String, String> properties = product.getProperties();
- Iterator<String> iter = properties.keySet().iterator();
- while (iter.hasNext()) {
- String name = iter.next();
- b.append(", ").append(name).append("=")
- .append(properties.get(name));
- }
- LOGGER.info(b.toString());
- System.out.println(b.toString());
- }
- /**
- * Called just before this listener processes a notification.
- *
- * @param notification
- * notification about to be processed.
- * @return true to process the notification, false to skip
- * @throws Exception if error occurs
- */
- protected boolean onBeforeProcessNotification(
- final Notification notification) throws Exception {
- // only check if we care
- if (!processDuplicates && this.notificationIndex != null) {
- List<Notification> notifications = this.notificationIndex
- .findNotifications(notification.getProductId());
- if (notifications.size() > 0) {
- LOGGER.finer("[" + getName()
- + "] skipping existing product "
- + notification.getProductId().toString());
- return false;
- }
- }
- return true;
- }
- /**
- * Called after a product has been downloaded, but before onProduct is
- * called.
- *
- * Sometimes a listener cannot tell whether it should process a product
- * until its contents are available. This is where the "includePaths" and
- * "excludePaths" are evaluated.
- *
- * @param product
- * product about to be processed.
- * @return true to process the product, false to skip
- * @throws Exception if error occurs
- */
- protected boolean onBeforeProcessProduct(final Product product)
- throws Exception {
- if (excludePaths.size() != 0) {
- Map<String, Content> contents = product.getContents();
- Iterator<String> pathIter = excludePaths.iterator();
- while (pathIter.hasNext()) {
- String path = pathIter.next();
- if (contents.containsKey(path)) {
- // contains at least one matching include path
- LOGGER.fine("[" + getName() + "] skipping product "
- + product.getId().toString() + ", '" + path
- + "' matches excludePaths");
- return false;
- }
- }
- }
- if (includePaths.size() != 0) {
- boolean containsPath = false;
- Map<String, Content> contents = product.getContents();
- Iterator<String> pathIter = includePaths.iterator();
- while (pathIter.hasNext()) {
- String path = pathIter.next();
- if (contents.containsKey(path)) {
- // contains at least one matching include path
- containsPath = true;
- break;
- }
- }
- if (!containsPath) {
- LOGGER.fine("[" + getName() + "] skipping product "
- + product.getId().toString()
- + ", does not match includePaths");
- return false;
- }
- }
- return true;
- }
- /**
- * Called when this listener has successfully processed a notification.
- *
- * @param notification
- * notification that was processed.
- * @throws Exception if error occurs
- */
- protected void onAfterProcessNotification(final Notification notification)
- throws Exception {
- if (this.notificationIndex != null) {
- this.notificationIndex.addNotification(notification);
- }
- }
- /**
- * Called when an expired notification is being removed from the index.
- *
- * @param notification to be removed
- * @throws Exception if error occurs
- */
- @Override
- public void onExpiredNotification(final Notification notification)
- throws Exception {
- // nothing to do
- }
- /**
- * Periodic cleanup task.
- *
- * Called every cleanupInterval milliseconds.
- */
- public void cleanup() throws Exception {
- LOGGER.finer("[" + getName() + "] running listener cleanup");
- if (this.notificationCleanup == null) {
- this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
- this.notificationCleanup.startup();
- } else {
- this.notificationCleanup.wakeUp();
- }
- }
- @Override
- public void startup() throws Exception {
- super.startup();
- if (this.notificationIndex != null) {
- this.notificationIndex.startup();
- }
- // only schedule cleanup if interval is non-zero
- if (cleanupInterval > 0) {
- cleanupTimer = new Timer();
- cleanupTimer.scheduleAtFixedRate(new TimerTask() {
- public void run() {
- try {
- cleanup();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception during cleanup", e);
- }
- }
- }, 0, cleanupInterval);
- }
- }
- @Override
- public void shutdown() throws Exception {
- super.shutdown();
- if (this.notificationCleanup != null) {
- try {
- this.notificationCleanup.shutdown();
- } catch (Exception e) {
- LOGGER.log(Level.INFO, "[" + getName() + "] exception stopping notification cleanup", e);
- } finally {
- this.notificationCleanup = null;
- }
- }
- try {
- this.notificationIndex.shutdown();
- } catch (Exception e) {
- // ignore
- }
- try {
- this.cleanupTimer.cancel();
- } catch (Exception e) {
- // ignore
- }
- }
- @Override
- public void configure(final Config config) throws Exception {
- super.configure(config);
- String notificationIndexName = config
- .getProperty(NOTIFICATION_INDEX_PROPERTY);
- String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
- if (notificationIndexName != null) {
- LOGGER.config("[" + getName() + "] loading notification index '"
- + notificationIndexName + "'");
- notificationIndex = (NotificationIndex) Config.getConfig()
- .getObject(notificationIndexName);
- if (notificationIndex == null) {
- throw new ConfigurationException("[" + getName() + "] index '"
- + notificationIndexName
- + "' is not properly configured");
- }
- } else if (notificationIndexFile != null) {
- LOGGER.config("[" + getName() + "] using notification index '"
- + notificationIndexFile + "'");
- notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
- }
- cleanupInterval = Long.parseLong(config.getProperty(
- CLEANUP_INTERVAL_PROPERTY, DEFAULT_CLEANUP_INTERVAL));
- LOGGER.config("[" + getName() + "] cleanup interval = "
- + processDuplicates);
- concurrentProducts = Integer.parseInt(config.getProperty(
- CONCURRENT_PRODUCTS_PROPERTY, DEFAULT_CONCURRENT_PRODUCTS));
- LOGGER.config("[" + getName() + "] concurrent products = "
- + concurrentProducts);
- processDuplicates = Boolean.valueOf(config.getProperty(
- PROCESS_DUPLICATES, DEFAULT_PROCESS_DUPLICATES));
- LOGGER.config("[" + getName() + "] process duplicates = "
- + processDuplicates);
- includePaths.addAll(StringUtils.split(
- config.getProperty(INCLUDE_PATHS_PROPERTY), ","));
- LOGGER.config("[" + getName() + "] include paths = " + includePaths);
- excludePaths.addAll(StringUtils.split(
- config.getProperty(EXCLUDE_PATHS_PROPERTY), ","));
- LOGGER.config("[" + getName() + "] exclude paths = " + excludePaths);
- }
- /** @return notificationIndex */
- public NotificationIndex getNotificationIndex() {
- return notificationIndex;
- }
- /** @param notificationIndex to set */
- public void setNotificationIndex(NotificationIndex notificationIndex) {
- this.notificationIndex = notificationIndex;
- }
- /** @return cleanupInterval */
- public Long getCleanupInterval() {
- return cleanupInterval;
- }
- /** @param cleanupInterval long to set */
- public void setCleanupInterval(Long cleanupInterval) {
- this.cleanupInterval = cleanupInterval;
- }
- /** @return concurrentProducts */
- public int getConcurrentProducts() {
- return concurrentProducts;
- }
- /** @param concurrentProducts int to set */
- public void setConcurrentProducts(int concurrentProducts) {
- this.concurrentProducts = concurrentProducts;
- }
- /** @return processDuplicates */
- public boolean isProcessDuplicates() {
- return processDuplicates;
- }
- /** @param processDuplicates boolean to set */
- public void setProcessDuplicates(boolean processDuplicates) {
- this.processDuplicates = processDuplicates;
- }
- /**
- * @return the includePaths
- */
- public ArrayList<String> getIncludePaths() {
- return includePaths;
- }
- /**
- * @return the excludePaths
- */
- public ArrayList<String> getExcludePaths() {
- return excludePaths;
- }
- }