ReliableIndexerListener.java

  1. /*
  2.  * Reliable Indexer Listener
  3.  */
  4. package gov.usgs.earthquake.indexer;

  5. import java.util.List;
  6. import java.util.logging.Level;
  7. import java.util.logging.Logger;

  8. import gov.usgs.earthquake.distribution.ConfigurationException;
  9. import gov.usgs.util.Config;

  10. /**
  11.  * ReliableIndexerListener listens for product changes by the indexer, then handles the new products independently in a background thread.
  12.  *
  13.  * This class does little more than output logs for the products it has seen; it is designed to be extended.
  14.  *
  15.  * Several useful methods are availble to be overridden or otherwise used:
  16.  * <ul>
  17.  * <li>onBeforeProcessThreadStart</li>
  18.  * <li>onProcessException</li>
  19.  * <li>getNextProducts</li>
  20.  * <li>processProducts</li>
  21.  * </ul>
  22.  *
  23.  * This class accepts an index for querying in config:
  24.  *
  25.  * <dl>
  26.  * <dt>index</dt>
  27.  * <dd>(Required) The index to use for product querying.</dd>
  28.  * </dl>
  29.  */

  30. public class ReliableIndexerListener extends DefaultIndexerListener implements Runnable {

  31.   /** Logger object */
  32.   protected static final Logger LOGGER = Logger
  33.           .getLogger(ReliableIndexerListener.class.getName());

  34.   private static final int PRODUCTS_PER_QUERY = 1000;

  35.   private boolean stopThread = false;
  36.   private long lastIndexId = -1;
  37.   private final Object syncObject = new Object();

  38.   private Thread processThread;
  39.   /** Product Index */
  40.   protected ProductIndex productIndex;

  41.   /**
  42.    * Sets up an object on start
  43.    *
  44.    * @param config configuration
  45.    *
  46.    * @throws Exception if missing product index
  47.    */
  48.   public void configure(Config config) throws Exception {
  49.     super.configure(config);

  50.     //Getting indexer for queries
  51.     String indexName = config.getProperty(Indexer.INDEX_CONFIG_PROPERTY);
  52.     if (indexName != null) {
  53.       LOGGER.config("[" + getName() + "] loading ProductIndex '"
  54.         + indexName + "'");
  55.       productIndex = (ProductIndex) Config.getConfig().getObject(indexName);
  56.     }
  57.     if (productIndex == null) {
  58.       throw new ConfigurationException("[" + getName()
  59.           + "] ProductIndex is required");
  60.     }
  61.   }

  62.   /**
  63.    * Wakes thread when indexer makes changes
  64.    *
  65.    * @param delta Indexer Event - not used
  66.    *
  67.    * @throws Exception if something goes wrong
  68.    */
  69.   public void onIndexerEvent(IndexerEvent delta) throws Exception {
  70.     //Synchronized on the syncObject so we don't miss events
  71.     synchronized (syncObject) {
  72.       syncObject.notify();
  73.     }
  74.     LOGGER.log(Level.FINEST,"[" + getName() + "] done being notified");
  75.   }

  76.   /**
  77.    * Thread main body. Waits until notified, then tries to get the next products and process them.
  78.    */
  79.   @Override
  80.   public void run() {
  81.     //Run until we're told not to
  82.     while (!stopThread) {

  83.       List<ProductSummary> productList = null;

  84.       //Synchronized so we aren't notified of new products right before we wait
  85.       synchronized (syncObject) {
  86.         try {
  87.           productList = getNextProducts();
  88.         } catch (Exception e) {
  89.           try {
  90.             //Handle exception if we can
  91.             this.onProductGetException(e);
  92.           } catch (Exception e2) {
  93.               //Do nothing if we can't
  94.           }
  95.         }
  96.         if (productList == null || productList.size() == 0) {
  97.           try {
  98.             //Wait when there are no more products to process
  99.             syncObject.wait();
  100.           } catch (InterruptedException ignore) {
  101.             //Ignore because it's most likely we get interrupted by shutdown
  102.             LOGGER.log(Level.FINE,"[" + getName() + "] was told to stop, or something went wrong");
  103.           }
  104.           continue;
  105.         }
  106.       }

  107.       //Process the products we have
  108.       for(ProductSummary summary : productList) {
  109.         LOGGER.log(Level.FINEST,"[" + getName() + "] preparing to process product " + summary.getIndexId());
  110.         //Check for shutdown every iteration so we don't hog shutdown time
  111.         synchronized(syncObject) {
  112.           if (stopThread) {
  113.             break;
  114.           }
  115.         }
  116.         try {
  117.           //Process the product types we're told to in configuration
  118.           LOGGER.log(Level.FINEST,"[" + getName() + "] determining if we can process product " + summary.getIndexId());
  119.           if (accept(summary.getId())) {
  120.             LOGGER.log(Level.FINEST,"[" + getName() + "] attempting to process product " + summary.getIndexId());
  121.             this.processProduct(summary);
  122.           }
  123.           //Update internal storage so we don't reprocess products
  124.           this.setLastIndexId(summary.getIndexId());
  125.         } catch (Exception e) {
  126.           try {
  127.             //Handle exception if we can
  128.             this.onProcessException(summary,e);
  129.           } catch(Exception e2) {
  130.             //Give up if we can't
  131.             break;
  132.           }
  133.         }
  134.       }

  135.     }
  136.   }

  137.   /**
  138.    * Starts thread
  139.    *
  140.    * Calls onBeforeProcessThreadStart() in case subclasses want to add functionality
  141.    *
  142.    * @throws Exception if there's a thread issue
  143.    * @throws Exception if thread start fails
  144.    */
  145.   @Override
  146.   public void startup() throws Exception{
  147.     super.startup();
  148.     this.onBeforeProcessThreadStart();
  149.     synchronized(syncObject) {
  150.       stopThread = false;
  151.       this.processThread = new Thread(this);
  152.     }
  153.     this.processThread.start();
  154.   }

  155.   /**
  156.    * Closes thread
  157.    *
  158.    * @throws Exception if there's a thread issue
  159.    */
  160.   @Override
  161.   public void shutdown() throws Exception {
  162.     try {
  163.       LOGGER.log(Level.FINEST,"[" + getName() + "] trying to shut down...");
  164.       //When the thread is ready, tell it to stop
  165.       synchronized (syncObject) {
  166.         stopThread = true;
  167.         this.processThread.interrupt();
  168.       }
  169.       this.processThread.join();
  170.     } finally {
  171.       super.shutdown();
  172.     }
  173.   }

  174.   /** @return ProductIndex */
  175.   public ProductIndex getProductIndex() {
  176.     return this.productIndex;
  177.   }

  178.   /** @param productIndex to set */
  179.   public void setProductIndex(ProductIndex productIndex) {
  180.     this.productIndex = productIndex;
  181.   }


  182.   ////////////////////////
  183.   //Stubs for subclasses//
  184.   ////////////////////////

  185.   /**
  186.    * Gets index ID of last processed product
  187.    * @return lastIndexId
  188.    */
  189.   public long getLastIndexId() {
  190.     return lastIndexId;
  191.   }

  192.   /**
  193.    * Sets index ID of last processed product
  194.    * @param lastIndexId to set
  195.    */
  196.   public void setLastIndexId(final long lastIndexId) {
  197.     this.lastIndexId = lastIndexId;
  198.   }

  199.   /**
  200.    * Run before thread start.
  201.    *
  202.    * @throws Exception available for subclasses
  203.    */
  204.   protected void onBeforeProcessThreadStart() throws Exception {
  205.     //Do database call to update lastIndexId
  206.   }

  207.   /**
  208.    * Exception handling for product fetch
  209.    *
  210.    * @param e the caught exception
  211.    * @throws Exception in case we can't handle the first exception
  212.    */
  213.   protected void onProductGetException(Exception e) throws Exception {
  214.     LOGGER.log(Level.WARNING, "[" + getName() + "] Exception getting next products", e);
  215.   }

  216.   /**
  217.    * Exception handling for product processing.
  218.    *
  219.    * @param product the product that gave us the error
  220.    * @param e the caught exception
  221.    *
  222.    * @throws Exception in case we can't handle the first exception.
  223.    */
  224.   protected void onProcessException(ProductSummary product, Exception e) throws Exception {
  225.     LOGGER.log(Level.WARNING, "[" + getName() + "] Exception processing product " + product.getId(), e);
  226.   }

  227.   /**
  228.    * Gets the next products using the index provided in Config
  229.    *
  230.    * @return List of product summaries
  231.    * @throws Exception if we have a database issue
  232.    */
  233.   public List<ProductSummary> getNextProducts() throws Exception{
  234.     ProductIndexQuery query = new ProductIndexQuery();
  235.     query.setLimit(PRODUCTS_PER_QUERY);
  236.     query.setOrderBy(JDBCProductIndex.SUMMARY_PRODUCT_INDEX_ID); //Currently the only public field; should maybe change
  237.     query.setMinProductIndexId(this.getLastIndexId()+1);

  238.     return productIndex.getProducts(query);
  239.   }

  240.   /**
  241.    * Does a task with each product
  242.    *
  243.    * @param product ProductSummary  to process
  244.    * @throws Exception available for subclasses
  245.    */
  246.   public void processProduct(final ProductSummary product) throws Exception {
  247.     //Do stuff
  248.     LOGGER.log(Level.FINER,"[" + getName() + "] processing product " + product.getId());
  249.   }



  250. }