ReliableIndexerListener.java

/*
 * Reliable Indexer Listener
 */
package gov.usgs.earthquake.indexer;

import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.util.Config;

/**
 * ReliableIndexerListener listens for product changes by the indexer, then handles the new products independently in a background thread.
 *
 * This class does little more than output logs for the products it has seen; it is designed to be extended.
 *
 * Several useful methods are availble to be overridden or otherwise used:
 * <ul>
 * <li>onBeforeProcessThreadStart</li>
 * <li>onProcessException</li>
 * <li>getNextProducts</li>
 * <li>processProducts</li>
 * </ul>
 *
 * This class accepts an index for querying in config:
 *
 * <dl>
 * <dt>index</dt>
 * <dd>(Required) The index to use for product querying.</dd>
 * </dl>
 */

public class ReliableIndexerListener extends DefaultIndexerListener implements Runnable {

  /** Logger object */
  protected static final Logger LOGGER = Logger
          .getLogger(ReliableIndexerListener.class.getName());

  private static final int PRODUCTS_PER_QUERY = 1000;

  private boolean stopThread = false;
  private long lastIndexId = -1;
  private final Object syncObject = new Object();

  private Thread processThread;
  /** Product Index */
  protected ProductIndex productIndex;

  /**
   * Sets up an object on start
   *
   * @param config configuration
   *
   * @throws Exception if missing product index
   */
  public void configure(Config config) throws Exception {
    super.configure(config);

    //Getting indexer for queries
    String indexName = config.getProperty(Indexer.INDEX_CONFIG_PROPERTY);
    if (indexName != null) {
      LOGGER.config("[" + getName() + "] loading ProductIndex '"
        + indexName + "'");
      productIndex = (ProductIndex) Config.getConfig().getObject(indexName);
    }
    if (productIndex == null) {
      throw new ConfigurationException("[" + getName()
          + "] ProductIndex is required");
    }
  }

  /**
   * Wakes thread when indexer makes changes
   *
   * @param delta Indexer Event - not used
   *
   * @throws Exception if something goes wrong
   */
  public void onIndexerEvent(IndexerEvent delta) throws Exception {
    //Synchronized on the syncObject so we don't miss events
    synchronized (syncObject) {
      syncObject.notify();
    }
    LOGGER.log(Level.FINEST,"[" + getName() + "] done being notified");
  }

  /**
   * Thread main body. Waits until notified, then tries to get the next products and process them.
   */
  @Override
  public void run() {
    //Run until we're told not to
    while (!stopThread) {

      List<ProductSummary> productList = null;

      //Synchronized so we aren't notified of new products right before we wait
      synchronized (syncObject) {
        try {
          productList = getNextProducts();
        } catch (Exception e) {
          try {
            //Handle exception if we can
            this.onProductGetException(e);
          } catch (Exception e2) {
              //Do nothing if we can't
          }
        }
        if (productList == null || productList.size() == 0) {
          try {
            //Wait when there are no more products to process
            syncObject.wait();
          } catch (InterruptedException ignore) {
            //Ignore because it's most likely we get interrupted by shutdown
            LOGGER.log(Level.FINE,"[" + getName() + "] was told to stop, or something went wrong");
          }
          continue;
        }
      }

      //Process the products we have
      for(ProductSummary summary : productList) {
        LOGGER.log(Level.FINEST,"[" + getName() + "] preparing to process product " + summary.getIndexId());
        //Check for shutdown every iteration so we don't hog shutdown time
        synchronized(syncObject) {
          if (stopThread) {
            break;
          }
        }
        try {
          //Process the product types we're told to in configuration
          LOGGER.log(Level.FINEST,"[" + getName() + "] determining if we can process product " + summary.getIndexId());
          if (accept(summary.getId())) {
            LOGGER.log(Level.FINEST,"[" + getName() + "] attempting to process product " + summary.getIndexId());
            this.processProduct(summary);
          }
          //Update internal storage so we don't reprocess products
          this.setLastIndexId(summary.getIndexId());
        } catch (Exception e) {
          try {
            //Handle exception if we can
            this.onProcessException(summary,e);
          } catch(Exception e2) {
            //Give up if we can't
            break;
          }
        }
      }

    }
  }

  /**
   * Starts thread
   *
   * Calls onBeforeProcessThreadStart() in case subclasses want to add functionality
   *
   * @throws Exception if there's a thread issue
   * @throws Exception if thread start fails
   */
  @Override
  public void startup() throws Exception{
    super.startup();
    this.onBeforeProcessThreadStart();
    synchronized(syncObject) {
      stopThread = false;
      this.processThread = new Thread(this);
    }
    this.processThread.start();
  }

  /**
   * Closes thread
   *
   * @throws Exception if there's a thread issue
   */
  @Override
  public void shutdown() throws Exception {
    try {
      LOGGER.log(Level.FINEST,"[" + getName() + "] trying to shut down...");
      //When the thread is ready, tell it to stop
      synchronized (syncObject) {
        stopThread = true;
        this.processThread.interrupt();
      }
      this.processThread.join();
    } finally {
      super.shutdown();
    }
  }

  /** @return ProductIndex */
  public ProductIndex getProductIndex() {
    return this.productIndex;
  }

  /** @param productIndex to set */
  public void setProductIndex(ProductIndex productIndex) {
    this.productIndex = productIndex;
  }


  ////////////////////////
  //Stubs for subclasses//
  ////////////////////////

  /**
   * Gets index ID of last processed product
   * @return lastIndexId
   */
  public long getLastIndexId() {
    return lastIndexId;
  }

  /**
   * Sets index ID of last processed product
   * @param lastIndexId to set
   */
  public void setLastIndexId(final long lastIndexId) {
    this.lastIndexId = lastIndexId;
  }

  /**
   * Run before thread start.
   *
   * @throws Exception available for subclasses
   */
  protected void onBeforeProcessThreadStart() throws Exception {
    //Do database call to update lastIndexId
  }

  /**
   * Exception handling for product fetch
   *
   * @param e the caught exception
   * @throws Exception in case we can't handle the first exception
   */
  protected void onProductGetException(Exception e) throws Exception {
    LOGGER.log(Level.WARNING, "[" + getName() + "] Exception getting next products", e);
  }

  /**
   * Exception handling for product processing.
   *
   * @param product the product that gave us the error
   * @param e the caught exception
   *
   * @throws Exception in case we can't handle the first exception.
   */
  protected void onProcessException(ProductSummary product, Exception e) throws Exception {
    LOGGER.log(Level.WARNING, "[" + getName() + "] Exception processing product " + product.getId(), e);
  }

  /**
   * Gets the next products using the index provided in Config
   *
   * @return List of product summaries
   * @throws Exception if we have a database issue
   */
  public List<ProductSummary> getNextProducts() throws Exception{
    ProductIndexQuery query = new ProductIndexQuery();
    query.setLimit(PRODUCTS_PER_QUERY);
    query.setOrderBy(JDBCProductIndex.SUMMARY_PRODUCT_INDEX_ID); //Currently the only public field; should maybe change
    query.setMinProductIndexId(this.getLastIndexId()+1);

    return productIndex.getProducts(query);
  }

  /**
   * Does a task with each product
   *
   * @param product ProductSummary  to process
   * @throws Exception available for subclasses
   */
  public void processProduct(final ProductSummary product) throws Exception {
    //Do stuff
    LOGGER.log(Level.FINER,"[" + getName() + "] processing product " + product.getId());
  }



}