ReliableIndexerListener.java
- /*
- * Reliable Indexer Listener
- */
- package gov.usgs.earthquake.indexer;
- import java.util.List;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import gov.usgs.earthquake.distribution.ConfigurationException;
- import gov.usgs.util.Config;
- /**
- * ReliableIndexerListener listens for product changes by the indexer, then handles the new products independently in a background thread.
- *
- * This class does little more than output logs for the products it has seen; it is designed to be extended.
- *
- * Several useful methods are availble to be overridden or otherwise used:
- * <ul>
- * <li>onBeforeProcessThreadStart</li>
- * <li>onProcessException</li>
- * <li>getNextProducts</li>
- * <li>processProducts</li>
- * </ul>
- *
- * This class accepts an index for querying in config:
- *
- * <dl>
- * <dt>index</dt>
- * <dd>(Required) The index to use for product querying.</dd>
- * </dl>
- */
- public class ReliableIndexerListener extends DefaultIndexerListener implements Runnable {
- /** Logger object */
- protected static final Logger LOGGER = Logger
- .getLogger(ReliableIndexerListener.class.getName());
- private static final int PRODUCTS_PER_QUERY = 1000;
- private boolean stopThread = false;
- private long lastIndexId = -1;
- private final Object syncObject = new Object();
- private Thread processThread;
- /** Product Index */
- protected ProductIndex productIndex;
- /**
- * Sets up an object on start
- *
- * @param config configuration
- *
- * @throws Exception if missing product index
- */
- public void configure(Config config) throws Exception {
- super.configure(config);
- //Getting indexer for queries
- String indexName = config.getProperty(Indexer.INDEX_CONFIG_PROPERTY);
- if (indexName != null) {
- LOGGER.config("[" + getName() + "] loading ProductIndex '"
- + indexName + "'");
- productIndex = (ProductIndex) Config.getConfig().getObject(indexName);
- }
- if (productIndex == null) {
- throw new ConfigurationException("[" + getName()
- + "] ProductIndex is required");
- }
- }
- /**
- * Wakes thread when indexer makes changes
- *
- * @param delta Indexer Event - not used
- *
- * @throws Exception if something goes wrong
- */
- public void onIndexerEvent(IndexerEvent delta) throws Exception {
- //Synchronized on the syncObject so we don't miss events
- synchronized (syncObject) {
- syncObject.notify();
- }
- LOGGER.log(Level.FINEST,"[" + getName() + "] done being notified");
- }
- /**
- * Thread main body. Waits until notified, then tries to get the next products and process them.
- */
- @Override
- public void run() {
- //Run until we're told not to
- while (!stopThread) {
- List<ProductSummary> productList = null;
- //Synchronized so we aren't notified of new products right before we wait
- synchronized (syncObject) {
- try {
- productList = getNextProducts();
- } catch (Exception e) {
- try {
- //Handle exception if we can
- this.onProductGetException(e);
- } catch (Exception e2) {
- //Do nothing if we can't
- }
- }
- if (productList == null || productList.size() == 0) {
- try {
- //Wait when there are no more products to process
- syncObject.wait();
- } catch (InterruptedException ignore) {
- //Ignore because it's most likely we get interrupted by shutdown
- LOGGER.log(Level.FINE,"[" + getName() + "] was told to stop, or something went wrong");
- }
- continue;
- }
- }
- //Process the products we have
- for(ProductSummary summary : productList) {
- LOGGER.log(Level.FINEST,"[" + getName() + "] preparing to process product " + summary.getIndexId());
- //Check for shutdown every iteration so we don't hog shutdown time
- synchronized(syncObject) {
- if (stopThread) {
- break;
- }
- }
- try {
- //Process the product types we're told to in configuration
- LOGGER.log(Level.FINEST,"[" + getName() + "] determining if we can process product " + summary.getIndexId());
- if (accept(summary.getId())) {
- LOGGER.log(Level.FINEST,"[" + getName() + "] attempting to process product " + summary.getIndexId());
- this.processProduct(summary);
- }
- //Update internal storage so we don't reprocess products
- this.setLastIndexId(summary.getIndexId());
- } catch (Exception e) {
- try {
- //Handle exception if we can
- this.onProcessException(summary,e);
- } catch(Exception e2) {
- //Give up if we can't
- break;
- }
- }
- }
- }
- }
- /**
- * Starts thread
- *
- * Calls onBeforeProcessThreadStart() in case subclasses want to add functionality
- *
- * @throws Exception if there's a thread issue
- * @throws Exception if thread start fails
- */
- @Override
- public void startup() throws Exception{
- super.startup();
- this.onBeforeProcessThreadStart();
- synchronized(syncObject) {
- stopThread = false;
- this.processThread = new Thread(this);
- }
- this.processThread.start();
- }
- /**
- * Closes thread
- *
- * @throws Exception if there's a thread issue
- */
- @Override
- public void shutdown() throws Exception {
- try {
- LOGGER.log(Level.FINEST,"[" + getName() + "] trying to shut down...");
- //When the thread is ready, tell it to stop
- synchronized (syncObject) {
- stopThread = true;
- this.processThread.interrupt();
- }
- this.processThread.join();
- } finally {
- super.shutdown();
- }
- }
- /** @return ProductIndex */
- public ProductIndex getProductIndex() {
- return this.productIndex;
- }
- /** @param productIndex to set */
- public void setProductIndex(ProductIndex productIndex) {
- this.productIndex = productIndex;
- }
- ////////////////////////
- //Stubs for subclasses//
- ////////////////////////
- /**
- * Gets index ID of last processed product
- * @return lastIndexId
- */
- public long getLastIndexId() {
- return lastIndexId;
- }
- /**
- * Sets index ID of last processed product
- * @param lastIndexId to set
- */
- public void setLastIndexId(final long lastIndexId) {
- this.lastIndexId = lastIndexId;
- }
- /**
- * Run before thread start.
- *
- * @throws Exception available for subclasses
- */
- protected void onBeforeProcessThreadStart() throws Exception {
- //Do database call to update lastIndexId
- }
- /**
- * Exception handling for product fetch
- *
- * @param e the caught exception
- * @throws Exception in case we can't handle the first exception
- */
- protected void onProductGetException(Exception e) throws Exception {
- LOGGER.log(Level.WARNING, "[" + getName() + "] Exception getting next products", e);
- }
- /**
- * Exception handling for product processing.
- *
- * @param product the product that gave us the error
- * @param e the caught exception
- *
- * @throws Exception in case we can't handle the first exception.
- */
- protected void onProcessException(ProductSummary product, Exception e) throws Exception {
- LOGGER.log(Level.WARNING, "[" + getName() + "] Exception processing product " + product.getId(), e);
- }
- /**
- * Gets the next products using the index provided in Config
- *
- * @return List of product summaries
- * @throws Exception if we have a database issue
- */
- public List<ProductSummary> getNextProducts() throws Exception{
- ProductIndexQuery query = new ProductIndexQuery();
- query.setLimit(PRODUCTS_PER_QUERY);
- query.setOrderBy(JDBCProductIndex.SUMMARY_PRODUCT_INDEX_ID); //Currently the only public field; should maybe change
- query.setMinProductIndexId(this.getLastIndexId()+1);
- return productIndex.getProducts(query);
- }
- /**
- * Does a task with each product
- *
- * @param product ProductSummary to process
- * @throws Exception available for subclasses
- */
- public void processProduct(final ProductSummary product) throws Exception {
- //Do stuff
- LOGGER.log(Level.FINER,"[" + getName() + "] processing product " + product.getId());
- }
- }