DefaultNotificationListener.java

  1. /*
  2.  * DefaultNotificationListener
  3.  */
  4. package gov.usgs.earthquake.distribution;

  5. import gov.usgs.earthquake.product.AbstractListener;
  6. import gov.usgs.earthquake.product.Content;
  7. import gov.usgs.earthquake.product.Product;
  8. import gov.usgs.earthquake.product.ProductId;
  9. import gov.usgs.util.Config;
  10. import gov.usgs.util.ObjectLock;
  11. import gov.usgs.util.StringUtils;

  12. import java.util.ArrayList;
  13. import java.util.Iterator;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.Timer;
  17. import java.util.TimerTask;
  18. import java.util.logging.Level;
  19. import java.util.logging.Logger;

  20. /**
  21.  * A base implementation of a notification listener. Implements functionality
  22.  * that is useful for most notification listeners.
  23.  *
  24.  * Sub classes should override the onProduct(Product) method to add custom
  25.  * processing.
  26.  *
  27.  * The DefaultNotificationListener extends the AbstractListener and can use any
  28.  * of those configuration parameters.
  29.  *
  30.  * @see gov.usgs.earthquake.product.AbstractListener
  31.  */
  32. public class DefaultNotificationListener extends AbstractListener implements
  33.         NotificationListener, NotificationIndexCleanup.Listener {

  34.     /** Logging object. */
  35.     private static final Logger LOGGER = Logger
  36.             .getLogger(DefaultNotificationListener.class.getName());

  37.     /** Property referencing a notification index config section. */
  38.     public static final String NOTIFICATION_INDEX_PROPERTY = "listenerIndex";
  39.     /** Property for listener index file */
  40.     public static final String INDEX_FILE_PROPERTY = "listenerIndexFile";

  41.     /** How long to wait until checking for expired notifications/products. */
  42.     public static final String CLEANUP_INTERVAL_PROPERTY = "cleanupInterval";
  43.     /** Default time to wait for cleanup. 1h */
  44.     public static final String DEFAULT_CLEANUP_INTERVAL = "3600000";

  45.     /** Property for concurrentProducts */
  46.     public static final String CONCURRENT_PRODUCTS_PROPERTY = "concurrentProducts";
  47.     /** How many products to process at a time.  */
  48.     public static final String DEFAULT_CONCURRENT_PRODUCTS = "1";

  49.     /** Whether or not to process products more than once. */
  50.     public static final String PROCESS_DUPLICATES = "processDuplicates";
  51.     /** Default for process duplicates. False */
  52.     public static final String DEFAULT_PROCESS_DUPLICATES = "false";

  53.     /** Filter products based on content paths they contain. */
  54.     public static final String INCLUDE_PATHS_PROPERTY = "includePaths";
  55.     /** Property for exludePaths */
  56.     public static final String EXCLUDE_PATHS_PROPERTY = "excludePaths";

  57.     /** Optional notification index. */
  58.     private NotificationIndex notificationIndex = null;

  59.     /** How often to run cleanup task, in ms, <=0 = off. */
  60.     private Long cleanupInterval = 0L;

  61.     /** Timer that schedules sender cleanup task. */
  62.     private Timer cleanupTimer = null;

  63.     /** Notification index cleanup. */
  64.     private NotificationIndexCleanup notificationCleanup = null;

  65.     /** How many products to process at the same time. */
  66.     private int concurrentProducts = 1;

  67.     /** Whether or not to process products that have already been processed. */
  68.     private boolean processDuplicates = false;

  69.     /** Array of content paths to search. */
  70.     private final ArrayList<String> includePaths = new ArrayList<String>();

  71.     /** Array of content paths to search. */
  72.     private final ArrayList<String> excludePaths = new ArrayList<String>();

  73.     /**
  74.      * Locks used to keep concurrent listener from processing product
  75.      * more than once at the same time.
  76.      */
  77.     private ObjectLock<ProductId> storageLocks = new ObjectLock<ProductId>();

  78.     /**
  79.      * Implement the NotificationListener interface.
  80.      *
  81.      * This method calls accept, and if accept returns true, retrieves the
  82.      * product and calls onProduct.
  83.      */
  84.     public void onNotification(final NotificationEvent event) throws Exception {
  85.         Notification notification = event.getNotification();
  86.         ProductId id = notification.getProductId();
  87.         String productId = id.toString();

  88.         LOGGER.finest("[" + getName() + "] received notification for id="
  89.                 + productId);

  90.         if (!accept(id)) {
  91.             LOGGER.finest("[" + getName() + "] rejected notification for id="
  92.                     + productId);
  93.             return;
  94.         }

  95.         // only allow one thread to process a product
  96.         storageLocks.acquireLock(id);
  97.         try {
  98.             if (!onBeforeProcessNotification(notification)) {
  99.                 return;
  100.             }

  101.             LOGGER.finer("[" + getName() + "] processing notification for id="
  102.                     + productId);

  103.             Product product = event.getProduct();
  104.             if (product == null) {
  105.                 throw new ContinuableListenerException("retrieved product null,"
  106.                         + " notification id=" + productId);
  107.             }

  108.             if (!onBeforeProcessProduct(product)) {
  109.                 return;
  110.             }
  111.             LOGGER.finer("[" + getName() + "] processing product for id="
  112.                     + productId);
  113.             onProduct(product);

  114.             onAfterProcessNotification(notification);
  115.         } finally {
  116.             // be sure to release lock when done/error
  117.             storageLocks.releaseLock(id);
  118.         }
  119.     }

  120.     /**
  121.      * Called by onNotification when a product is retrieved.
  122.      *
  123.      * @param product
  124.      *            a product whose notification was accepted.
  125.      * @throws Exception if error occurs
  126.      */
  127.     public void onProduct(final Product product) throws Exception {
  128.         // subclasses do stuff here
  129.         ProductId id = product.getId();
  130.         StringBuffer b = new StringBuffer("[" + getName()
  131.                 + "] product processed source=" + id.getSource() + ", type="
  132.                 + id.getType() + ", code=" + id.getCode() + ", updateTime="
  133.                 + id.getUpdateTime().toString());

  134.         Map<String, String> properties = product.getProperties();
  135.         Iterator<String> iter = properties.keySet().iterator();
  136.         while (iter.hasNext()) {
  137.             String name = iter.next();
  138.             b.append(", ").append(name).append("=")
  139.                     .append(properties.get(name));
  140.         }

  141.         LOGGER.info(b.toString());
  142.         System.out.println(b.toString());
  143.     }

  144.     /**
  145.      * Called just before this listener processes a notification.
  146.      *
  147.      * @param notification
  148.      *            notification about to be processed.
  149.      * @return true to process the notification, false to skip
  150.      * @throws Exception if error occurs
  151.      */
  152.     protected boolean onBeforeProcessNotification(
  153.             final Notification notification) throws Exception {
  154.         // only check if we care
  155.         if (!processDuplicates && this.notificationIndex != null) {
  156.             List<Notification> notifications = this.notificationIndex
  157.                     .findNotifications(notification.getProductId());
  158.             if (notifications.size() > 0) {
  159.                 LOGGER.finer("[" + getName()
  160.                         + "] skipping existing product "
  161.                         + notification.getProductId().toString());
  162.                 return false;
  163.             }
  164.         }

  165.         return true;
  166.     }

  167.     /**
  168.      * Called after a product has been downloaded, but before onProduct is
  169.      * called.
  170.      *
  171.      * Sometimes a listener cannot tell whether it should process a product
  172.      * until its contents are available. This is where the "includePaths" and
  173.      * "excludePaths" are evaluated.
  174.      *
  175.      * @param product
  176.      *            product about to be processed.
  177.      * @return true to process the product, false to skip
  178.      * @throws Exception if error occurs
  179.      */
  180.     protected boolean onBeforeProcessProduct(final Product product)
  181.             throws Exception {
  182.         if (excludePaths.size() != 0) {
  183.             Map<String, Content> contents = product.getContents();
  184.             Iterator<String> pathIter = excludePaths.iterator();
  185.             while (pathIter.hasNext()) {
  186.                 String path = pathIter.next();
  187.                 if (contents.containsKey(path)) {
  188.                     // contains at least one matching include path
  189.                     LOGGER.fine("[" + getName() + "] skipping product "
  190.                             + product.getId().toString() + ", '" + path
  191.                             + "' matches excludePaths");
  192.                     return false;
  193.                 }
  194.             }
  195.         }

  196.         if (includePaths.size() != 0) {
  197.             boolean containsPath = false;
  198.             Map<String, Content> contents = product.getContents();
  199.             Iterator<String> pathIter = includePaths.iterator();
  200.             while (pathIter.hasNext()) {
  201.                 String path = pathIter.next();
  202.                 if (contents.containsKey(path)) {
  203.                     // contains at least one matching include path
  204.                     containsPath = true;
  205.                     break;
  206.                 }
  207.             }
  208.             if (!containsPath) {
  209.                 LOGGER.fine("[" + getName() + "] skipping product "
  210.                         + product.getId().toString()
  211.                         + ", does not match includePaths");
  212.                 return false;
  213.             }
  214.         }

  215.         return true;
  216.     }

  217.     /**
  218.      * Called when this listener has successfully processed a notification.
  219.      *
  220.      * @param notification
  221.      *            notification that was processed.
  222.      * @throws Exception if error occurs
  223.      */
  224.     protected void onAfterProcessNotification(final Notification notification)
  225.             throws Exception {
  226.         if (this.notificationIndex != null) {
  227.             this.notificationIndex.addNotification(notification);
  228.         }
  229.     }

  230.     /**
  231.      * Called when an expired notification is being removed from the index.
  232.      *
  233.      * @param notification to be removed
  234.      * @throws Exception if error occurs
  235.      */
  236.     @Override
  237.     public void onExpiredNotification(final Notification notification)
  238.             throws Exception {
  239.         // nothing to do
  240.     }

  241.         /**
  242.      * Periodic cleanup task.
  243.      *
  244.      * Called every cleanupInterval milliseconds.
  245.      */
  246.     public void cleanup() throws Exception {
  247.         LOGGER.finer("[" + getName() + "] running listener cleanup");
  248.         if (this.notificationCleanup == null) {
  249.             this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
  250.             this.notificationCleanup.startup();
  251.         } else {
  252.             this.notificationCleanup.wakeUp();
  253.         }
  254.     }

  255.     @Override
  256.     public void startup() throws Exception {
  257.         super.startup();
  258.         if (this.notificationIndex != null) {
  259.             this.notificationIndex.startup();
  260.         }

  261.         // only schedule cleanup if interval is non-zero
  262.         if (cleanupInterval > 0) {
  263.             cleanupTimer = new Timer();
  264.             cleanupTimer.scheduleAtFixedRate(new TimerTask() {
  265.                 public void run() {
  266.                     try {
  267.                         cleanup();
  268.                     } catch (Exception e) {
  269.                         LOGGER.log(Level.WARNING, "[" + getName()
  270.                                 + "] exception during cleanup", e);
  271.                     }
  272.                 }
  273.             }, 0, cleanupInterval);
  274.         }
  275.     }

  276.     @Override
  277.     public void shutdown() throws Exception {
  278.         super.shutdown();
  279.         if (this.notificationCleanup != null) {
  280.             try {
  281.                 this.notificationCleanup.shutdown();
  282.             } catch (Exception e) {
  283.                 LOGGER.log(Level.INFO, "[" + getName() + "] exception stopping notification cleanup", e);
  284.             } finally {
  285.                 this.notificationCleanup = null;
  286.             }
  287.         }
  288.         try {
  289.             this.notificationIndex.shutdown();
  290.         } catch (Exception e) {
  291.             // ignore
  292.         }
  293.         try {
  294.             this.cleanupTimer.cancel();
  295.         } catch (Exception e) {
  296.             // ignore
  297.         }
  298.     }

  299.     @Override
  300.     public void configure(final Config config) throws Exception {
  301.         super.configure(config);

  302.         String notificationIndexName = config
  303.                 .getProperty(NOTIFICATION_INDEX_PROPERTY);
  304.         String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
  305.         if (notificationIndexName != null) {
  306.             LOGGER.config("[" + getName() + "] loading notification index '"
  307.                     + notificationIndexName + "'");
  308.             notificationIndex = (NotificationIndex) Config.getConfig()
  309.                     .getObject(notificationIndexName);
  310.             if (notificationIndex == null) {
  311.                 throw new ConfigurationException("[" + getName() + "] index '"
  312.                         + notificationIndexName
  313.                         + "' is not properly configured");
  314.             }
  315.         } else if (notificationIndexFile != null) {
  316.             LOGGER.config("[" + getName() + "] using notification index '"
  317.                     + notificationIndexFile + "'");
  318.             notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
  319.         }

  320.         cleanupInterval = Long.parseLong(config.getProperty(
  321.                 CLEANUP_INTERVAL_PROPERTY, DEFAULT_CLEANUP_INTERVAL));
  322.         LOGGER.config("[" + getName() + "] cleanup interval = "
  323.                 + processDuplicates);

  324.         concurrentProducts = Integer.parseInt(config.getProperty(
  325.             CONCURRENT_PRODUCTS_PROPERTY, DEFAULT_CONCURRENT_PRODUCTS));
  326.         LOGGER.config("[" + getName() + "] concurrent products = "
  327.                 + concurrentProducts);

  328.         processDuplicates = Boolean.valueOf(config.getProperty(
  329.                 PROCESS_DUPLICATES, DEFAULT_PROCESS_DUPLICATES));
  330.         LOGGER.config("[" + getName() + "] process duplicates = "
  331.                 + processDuplicates);

  332.         includePaths.addAll(StringUtils.split(
  333.                 config.getProperty(INCLUDE_PATHS_PROPERTY), ","));
  334.         LOGGER.config("[" + getName() + "] include paths = " + includePaths);

  335.         excludePaths.addAll(StringUtils.split(
  336.                 config.getProperty(EXCLUDE_PATHS_PROPERTY), ","));
  337.         LOGGER.config("[" + getName() + "] exclude paths = " + excludePaths);
  338.     }

  339.     /** @return notificationIndex */
  340.     public NotificationIndex getNotificationIndex() {
  341.         return notificationIndex;
  342.     }

  343.     /** @param notificationIndex to set */
  344.     public void setNotificationIndex(NotificationIndex notificationIndex) {
  345.         this.notificationIndex = notificationIndex;
  346.     }

  347.     /** @return cleanupInterval */
  348.     public Long getCleanupInterval() {
  349.         return cleanupInterval;
  350.     }

  351.     /** @param cleanupInterval long to set */
  352.     public void setCleanupInterval(Long cleanupInterval) {
  353.         this.cleanupInterval = cleanupInterval;
  354.     }

  355.     /** @return concurrentProducts */
  356.     public int getConcurrentProducts() {
  357.         return concurrentProducts;
  358.     }

  359.     /** @param concurrentProducts int to set */
  360.     public void setConcurrentProducts(int concurrentProducts) {
  361.         this.concurrentProducts = concurrentProducts;
  362.     }

  363.     /** @return processDuplicates */
  364.     public boolean isProcessDuplicates() {
  365.         return processDuplicates;
  366.     }

  367.     /** @param processDuplicates boolean to set */
  368.     public void setProcessDuplicates(boolean processDuplicates) {
  369.         this.processDuplicates = processDuplicates;
  370.     }

  371.     /**
  372.      * @return the includePaths
  373.      */
  374.     public ArrayList<String> getIncludePaths() {
  375.         return includePaths;
  376.     }

  377.     /**
  378.      * @return the excludePaths
  379.      */
  380.     public ArrayList<String> getExcludePaths() {
  381.         return excludePaths;
  382.     }

  383. }