Indexer.java

  1. /*
  2.  * Indexer
  3.  */
  4. package gov.usgs.earthquake.indexer;

  5. import gov.usgs.earthquake.distribution.ConfigurationException;
  6. import gov.usgs.earthquake.distribution.DefaultNotificationListener;
  7. import gov.usgs.earthquake.distribution.FileProductStorage;
  8. import gov.usgs.earthquake.distribution.HeartbeatListener;
  9. import gov.usgs.earthquake.distribution.Notification;
  10. import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
  11. import gov.usgs.earthquake.distribution.ProductStorage;
  12. import gov.usgs.earthquake.geoserve.ANSSRegionsFactory;
  13. import gov.usgs.earthquake.product.Product;
  14. import gov.usgs.earthquake.product.ProductId;
  15. import gov.usgs.earthquake.util.CompareUtil;
  16. import gov.usgs.util.Config;
  17. import gov.usgs.util.Configurable;
  18. import gov.usgs.util.FutureExecutorTask;
  19. import gov.usgs.util.StringUtils;

  20. import java.io.File;
  21. import java.util.ArrayList;
  22. import java.util.Date;
  23. import java.util.HashMap;
  24. import java.util.Iterator;
  25. import java.util.LinkedList;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Timer;
  29. import java.util.TimerTask;
  30. import java.util.concurrent.ExecutorService;
  31. import java.util.concurrent.Executors;
  32. import java.util.logging.Level;
  33. import java.util.logging.Logger;

  34. /**
  35.  * The indexer receives products from Distribution, and adds them to the
  36.  * EventIndex.
  37.  *
  38.  * This class provides the following configurable properties (in addition to
  39.  * those inherited from DefaultNotificationListener):
  40.  * <dl>
  41.  * <dt>associator</dt>
  42.  * <dd>An object that implements the Associator interface.</dd>
  43.  *
  44.  * <dt>storage</dt>
  45.  * <dd>An object that implements the ProductStorage interface.</dd>
  46.  *
  47.  * <dt>index</dt>
  48.  * <dd>An object that implements the ProductIndex interface.</dd>
  49.  *
  50.  * <dt>modules</dt>
  51.  * <dd>A comma delimited list of objects that implement the IndexerModule
  52.  * interface</dd>
  53.  *
  54.  * <dt>listeners</dt>
  55.  * <dd>A comma delimited list of objects that implement the IndexerListener
  56.  * interface</dd>
  57.  * </dl>
  58.  */
  59. public class Indexer extends DefaultNotificationListener {

  60.     /** Logging Utility **/
  61.     private static final Logger LOGGER = Logger.getLogger(Indexer.class
  62.             .getName());

  63.     /** Preferred weight for persistent trump. */
  64.     public static final long TRUMP_PREFERRED_WEIGHT = 100000000;
  65.     /** Type for persistent trimp */
  66.     public static final String TRUMP_PRODUCT_TYPE = "trump";
  67.     /** Prefix for persistent trump */
  68.     public static final String PERSISTENT_TRUMP_PREFIX = "trump-";

  69.     /** Property name to configure a custom associator. */
  70.     public static final String ASSOCIATOR_CONFIG_PROPERTY = "associator";
  71.     /** Property to associate using current products */
  72.     public static final String ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY = "associateUsingCurrentProducts";
  73.     /** Default state for associate using current products */
  74.     public static final String DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS = "false";

  75.     /** Property name to configure a custom storage. */
  76.     public static final String STORAGE_CONFIG_PROPERTY = "storage";

  77.     /** Shortcut name to configure a file product storage. */
  78.     public static final String STORAGE_DIRECTORY_CONFIG_PROPERTY = "storageDirectory";

  79.     /** Property name to configure a custom index. */
  80.     public static final String INDEX_CONFIG_PROPERTY = "index";

  81.     /** Shortcut name to configure a sqlite index. */
  82.     public static final String INDEXFILE_CONFIG_PROPERTY = "indexFile";

  83.     /** Property name to configure modules. */
  84.     public static final String MODULES_CONFIG_PROPERTY = "modules";

  85.     /** Property name to configure listeners. */
  86.     public static final String LISTENERS_CONFIG_PROPERTY = "listeners";

  87.     /** Property name to configure local regions file. */
  88.     public static final String LOCAL_REGIONS_PROPERTY = "localRegionsFile";
  89.     /** Path to local regions file. */
  90.     public static final String DEFAULT_LOCAL_REGIONS = "regions.json";

  91.     /** Property name to enable search socket. */
  92.     public static final String ENABLE_SEARCH_PROPERTY = "enableSearch";
  93.     /** Property name for search socket port. */
  94.     public static final String SEARCH_PORT_PROPERTY = "searchPort";
  95.     /** Property name for search socket thread pool size. */
  96.     public static final String SEARCH_THREADS_PROPERTY = "searchThreads";

  97.     /** Default value whether to enable search socket. */
  98.     public static final String DEFAULT_ENABLE_SEARCH = "false";
  99.     /** Default port where search socket listens. */
  100.     public static final String DEFAULT_SEARCH_PORT = "11236";
  101.     /** Number of threads (concurrent searches) allowed. */
  102.     public static final String DEFAULT_SEARCH_THREADS = "5";

  103.     /** Utility used for associating products to events. */
  104.     private Associator associator;

  105.     /** Whether to use (false) all products or (true) current products. */
  106.     private boolean associateUsingCurrentProducts = false;

  107.     /** Where product contents are stored. */
  108.     private ProductStorage productStorage;

  109.     /** Index of stored products, and how they are related. */
  110.     private ProductIndex productIndex;

  111.     /** Read index for {@link #hasProductBeenIndexed(ProductId)} */
  112.     private ProductIndex readProductIndex;

  113.     /** Modules provide product specific functionality. */
  114.     private List<IndexerModule> modules = new LinkedList<IndexerModule>();

  115.     /** Listeners listen for changes to the event index. */
  116.     private Map<IndexerListener, ExecutorService> listeners = new HashMap<IndexerListener, ExecutorService>();

  117.     /** Local file where regions are stored. */
  118.     private File localRegionsFile = new File(DEFAULT_LOCAL_REGIONS);

  119.     /** Timer for archive policy thread. */
  120.     private Timer archiveTimer = null;

  121.     /** Task for archive policy thread. */
  122.     private TimerTask archiveTask = null;

  123.     /** Synchronization object for indexing. */
  124.     private final Object indexProductSync = new Object();

  125.     /**
  126.      * Service used by FutureExecutorTask for execution.
  127.      * See distribution.FutureListenerNotifier for more details.
  128.      */
  129.   private ExecutorService backgroundService;

  130.     /** Whether to (false) or not (true) to run archive policies. */
  131.     private boolean disableArchive = false;

  132.     // -- Configurable property names -- //
  133.     /** Configurable property for index archive internal */
  134.     public static final String INDEX_ARCHIVE_INTERVAL_PROPERTY = "archiveInterval";
  135.     /** Configurable property for index archive policy */
  136.     public static final String INDEX_ARCHIVE_POLICY_PROPERTY = "archivePolicy";

  137.     // -- Default configurable property values -- //
  138.     private static final long INDEX_ARCHIVE_INTERVAL_DEFAULT = 300000L;

  139.     // -- Configured member variables. Values set in configure() method. -- //
  140.     private long archiveInterval = 0;

  141.     private List<ArchivePolicy> archivePolicies = null;

  142.     private SearchServerSocket searchSocket = null;

  143.     private DefaultIndexerModule defaultModule = new DefaultIndexerModule();

  144.     /**
  145.      * Default no-arg constructor. This gets called from the Configurable API.
  146.      * All configuration parameters are set in the "configure" method.
  147.      *
  148.      * @throws Exception
  149.      *             If the JDBCProductIndex throws an exception.
  150.      */
  151.     public Indexer() throws Exception {
  152.         addModule(defaultModule);

  153.         associator = new DefaultAssociator();
  154.         productStorage = new FileProductStorage();
  155.         productIndex = new JDBCProductIndex();
  156.         archivePolicies = new LinkedList<ArchivePolicy>();
  157.     }

  158.     /**
  159.      * Returns the current associator used to associate products to one-another
  160.      * and products to events.
  161.      *
  162.      * @return The current Associator.
  163.      */
  164.     public Associator getAssociator() {
  165.         return associator;
  166.     }

  167.     /**
  168.      * Sets the given associator as the current associator to associate products
  169.      * to one-another and products to events.
  170.      *
  171.      * @param associator
  172.      *            The associator to use from this point forward.
  173.      */
  174.     public void setAssociator(Associator associator) {
  175.         this.associator = associator;
  176.     }

  177.     /**
  178.      * Returns the product storage component that is used to store products as
  179.      * they are received.
  180.      *
  181.      * @return The current product storage component.
  182.      */
  183.     public ProductStorage getProductStorage() {
  184.         return productStorage;
  185.     }

  186.     /**
  187.      * Sets the current product storage component used to store products as they
  188.      * are received.
  189.      *
  190.      * @param productStorage
  191.      *            The product storage component to use from this point forward.
  192.      */
  193.     public void setProductStorage(ProductStorage productStorage) {
  194.         this.productStorage = productStorage;
  195.     }

  196.     /**
  197.      * Returns the product index component used to index product information as
  198.      * it is received.
  199.      *
  200.      * @return The current product index component.
  201.      */
  202.     public ProductIndex getProductIndex() {
  203.         return productIndex;
  204.     }

  205.     /**
  206.      * Sets the product index component used to index product information as it
  207.      * is received.
  208.      *
  209.      * @param productIndex
  210.      *            The product index component to use from this point forward.
  211.      */
  212.     public void setProductIndex(ProductIndex productIndex) {
  213.         this.productIndex = productIndex;
  214.     }

  215.     /**
  216.      * Adds the give indexer module to the current list of modules used by the
  217.      * indexer to handle products.
  218.      *
  219.      * @param toAdd
  220.      *            The IndexerModule to add to our list.
  221.      */
  222.     public void addModule(final IndexerModule toAdd) {
  223.         modules.add(toAdd);
  224.     }

  225.     /**
  226.      * Removes the first occurrence of the given indexer module from the current
  227.      * list of known modules.
  228.      *
  229.      * @param toRemove
  230.      *            The module to remove.
  231.      * @see java.util.LinkedList#remove(Object)
  232.      */
  233.     public void removeModule(final IndexerModule toRemove) {
  234.         modules.remove(toRemove);
  235.     }

  236.     /**
  237.      * This method checks each module's support level for the given product,
  238.      * returning the first module with the highest support level.
  239.      *
  240.      * @param product
  241.      *            the product to summarize.
  242.      * @return module best suited to summarize product.
  243.      */
  244.     protected IndexerModule getModule(final Product product) {
  245.         // mit is the module fetched off the iterator
  246.         // m is the module to return
  247.         IndexerModule mit = null, m = null;
  248.         Iterator<IndexerModule> it = modules.iterator();

  249.         // If there are no known modules, then null. Oops. :)
  250.         if (it.hasNext()) {
  251.             // Use first module so long as it exists
  252.             m = it.next();

  253.             // Check remaining modules if any offer greater support
  254.             while (it.hasNext()) {
  255.                 mit = it.next();
  256.                 // We use strictly greater than (no equals)
  257.                 if (mit.getSupportLevel(product) > m.getSupportLevel(product)) {
  258.                     m = mit;
  259.                 }
  260.             }
  261.         }
  262.         return m;
  263.     }

  264.     /**
  265.      * Adds a listener to this indexer. Listeners are notified when an event is
  266.      * added, updated, or deleted, or when a new product arrives and is
  267.      * un-associated to an event.
  268.      *
  269.      * @param toAdd
  270.      *            The IndexerListener to add
  271.      */
  272.     public void addListener(final IndexerListener toAdd) {
  273.         if (!listeners.containsKey(toAdd)) {
  274.             ExecutorService listenerExecutor = Executors
  275.                     .newSingleThreadExecutor();
  276.             listeners.put(toAdd, listenerExecutor);
  277.         }
  278.     }

  279.     /**
  280.      * Removes a listener from this indexer.Listeners are notified when an event
  281.      * is added, updated, or deleted, or when a new product arrives and is
  282.      * un-associated to an event.
  283.      *
  284.      * @param toRemove
  285.      *            The IndexerListener to remove
  286.      */
  287.     public void removeListener(final IndexerListener toRemove) {
  288.         // Remove listener from map
  289.         ExecutorService listenerExecutor = listeners.remove(toRemove);

  290.         if (listenerExecutor != null) {
  291.             // Shutdown executor thread
  292.             listenerExecutor.shutdown();
  293.         }

  294.         backgroundService.shutdown();
  295.         backgroundService = null;
  296.     }

  297.     /**
  298.      * Send an indexer event to all registered IndexerListeners.
  299.      *
  300.      * Creates a NotificationEvent, with a reference to this object and calls
  301.      * each notificationListeners onNotification method in separate threads.
  302.      *
  303.      * This method usually returns before registered NotificationListeners have
  304.      * completed processing a notification.
  305.      *
  306.      * @param event
  307.      *            The event that occurred to trigger the notification. Note: An
  308.      *            IndexerEvent has a specific "type" to clarify the type of
  309.      *            event that occurred.
  310.      */
  311.     protected synchronized void notifyListeners(final IndexerEvent event) {
  312.         StringBuffer buf = new StringBuffer();
  313.         Iterator<IndexerChange> changes = event.getIndexerChanges().iterator();
  314.         while (changes.hasNext()) {
  315.             IndexerChange change = changes.next();
  316.             buf.append("\n").append(change.getType().toString()).append(" ");
  317.             if (change.getOriginalEvent() == null) {
  318.                 buf.append("null");
  319.             } else {
  320.                 buf.append(change.getOriginalEvent().getEventId());
  321.             }
  322.             buf.append(" => ");
  323.             if (change.getNewEvent() == null) {
  324.                 buf.append("null");
  325.             } else {
  326.                 buf.append(change.getNewEvent().getEventId());
  327.             }
  328.         }

  329.         // Can't rely on event.getSummary because that might be null
  330.         ProductSummary theSummary = event.getSummary();
  331.         if (theSummary == null && event.getEvents().size() > 0) {
  332.             theSummary = event.getEvents().get(0).getEventIdProduct();
  333.         }

  334.         if (theSummary != null) {
  335.             LOGGER.log(Level.INFO, "[" + getName() + "] indexed product id="
  336.                     + theSummary.getId().toString()
  337.                     + ", status=" + theSummary.getStatus()
  338.                     + buf.toString());
  339.         } else {
  340.             LOGGER.log(Level.FINE, "[" + getName()
  341.                     + "] event summary was null. This probably "
  342.                     + "means the archive policy is notifying of an archived "
  343.                     + "event.");
  344.         }

  345.         Iterator<IndexerListener> it = listeners.keySet().iterator();
  346.         while (it.hasNext()) {
  347.             final IndexerListener listener = it.next();
  348.             ExecutorService listenerExecutor = listeners.get(listener);
  349.             FutureExecutorTask<Void> listenerTask = new FutureExecutorTask<Void>(
  350.                     backgroundService, listenerExecutor, listener.getMaxTries(),
  351.                     listener.getTimeout(), new IndexerListenerCallable(listener,
  352.                             event));
  353.             listenerExecutor.submit(listenerTask);
  354.         }
  355.     }

  356.     /**
  357.      * Check whether this product is in the index.
  358.      *
  359.      * NOT synchronized to allow multiple threads to access.
  360.      * readProductIndex.hasProduct is synchronized.
  361.      *
  362.      * @param id ProductId to check
  363.      * @return true if product has already been indexed.
  364.      */
  365.     protected boolean hasProductBeenIndexed(final ProductId id) {
  366.         try {
  367.             if (readProductIndex == productIndex) {
  368.                 // synchronize on this if read and product index are same
  369.                 synchronized (indexProductSync) {
  370.                     readProductIndex.beginTransaction();
  371.                     try {
  372.                         boolean hasProduct = readProductIndex.hasProduct(id);
  373.                         readProductIndex.commitTransaction();
  374.                         return hasProduct;
  375.                     } catch (Exception e) {
  376.                         readProductIndex.rollbackTransaction();
  377.                     }
  378.                 }
  379.             } else {
  380.                 // otherwise synchronize on readProductIndex
  381.                 // transaction reconnects if needed
  382.                 synchronized (readProductIndex) {
  383.                     readProductIndex.beginTransaction();
  384.                     try {
  385.                         boolean hasProduct = readProductIndex.hasProduct(id);
  386.                         readProductIndex.commitTransaction();
  387.                         return hasProduct;
  388.                     } catch (Exception e) {
  389.                         readProductIndex.rollbackTransaction();
  390.                     }
  391.                 }
  392.             }
  393.         } catch (Exception wtf) {
  394.             LOGGER.log(Level.WARNING, "[" + getName()
  395.                     + "] exception checking if product already indexed", wtf);
  396.         }

  397.         // default is it hasn't been processed
  398.         return false;
  399.     }

  400.     /**
  401.      * Override the DefaultNotificationListener accept method,
  402.      * to always process products that may affect event association.
  403.      *
  404.      * @param id
  405.      *        the product id to check.
  406.      * @return boolean
  407.      *         whether the product should be indexed.
  408.      */
  409.     @Override
  410.     public boolean accept(final ProductId id) {
  411.         final boolean superAccept = super.accept(id);

  412.         if (!superAccept && isIncludeActuals()) {
  413.             // automatically accept products that affect association
  414.             // (if processing non-scenario products)
  415.             final String type = id.getType();
  416.             if (Event.ORIGIN_PRODUCT_TYPE.equals(type) ||
  417.                     Event.ASSOCIATE_PRODUCT_TYPE.equals(type) ||
  418.                     Event.DISASSOCIATE_PRODUCT_TYPE.equals(type)
  419.                     || type.startsWith(TRUMP_PRODUCT_TYPE)) {
  420.                 return true;
  421.             }
  422.         }

  423.         return superAccept;
  424.     }

  425.     /**
  426.      * Check whether to skip products that have already been indexed.
  427.      */
  428.     @Override
  429.     protected boolean onBeforeProcessNotification(Notification notification) throws Exception {
  430.         // try to short-circuit duplicates
  431.         if (!isProcessDuplicates() && hasProductBeenIndexed(notification.getProductId())) {
  432.             LOGGER.finer(
  433.                     "[" + getName() + "] notification already indexed, skipping "
  434.                     + notification.getProductId().toString());
  435.             return false;
  436.         }
  437.         // otherwise, use default behavior
  438.         return super.onBeforeProcessNotification(notification);
  439.     }

  440.     /**
  441.      * This method receives a product from Product Distribution and adds it to
  442.      * the index.
  443.      *
  444.      * Implementation follows from Product Indexer Diagram (pg.10) of
  445.      * ProductIndexer.pdf document dated 09/09/2010.
  446.      *
  447.      * Calls onProduct(product, false), which will not reprocess already
  448.      * processed products.
  449.      *
  450.      * @param product
  451.      *            The product triggering the event.
  452.      * @throws Exception
  453.      *             if an exception occurs.
  454.      */
  455.     @Override
  456.     public void onProduct(final Product product) throws Exception {
  457.         onProduct(product, false);
  458.     }

  459.     /**
  460.      * Receive a product and add it to the index. Optionally, reprocessing a
  461.      * product that has already been processed.
  462.      *
  463.      * @param product
  464.      *            The product triggering the event.
  465.      * @param force
  466.      *            Whether to reprocess products that have already been processed
  467.      *            (true), or skip (false).
  468.      * @throws Exception if error occurs
  469.      */
  470.     public void onProduct(final Product product, final boolean force) throws Exception {
  471.         ProductId id = product.getId();
  472.         final long beginStore = new Date().getTime();

  473.         // -------------------------------------------------------------------//
  474.         // -- Step 1: Store product
  475.         // -------------------------------------------------------------------//
  476.         final Product storedProduct = storeProduct(product, force);
  477.         if (storedProduct == null) {
  478.             return;
  479.         }

  480.         // -------------------------------------------------------------------//
  481.         // -- Step 2: Use product module to summarize product
  482.         // -------------------------------------------------------------------//

  483.         LOGGER.finer("[" + getName() + "] summarizing product id=" + id.toString());
  484.         final ProductSummary productSummary = summarizeProduct(product);

  485.         // -------------------------------------------------------------------//
  486.         // -- Step 3: Add product summary to the product index
  487.         // -------------------------------------------------------------------//

  488.         LOGGER.finer("[" + getName() + "] indexing product id=" + id.toString());
  489.         // measure time waiting to enter synchronized block
  490.         final long beforeEnterSync = new Date().getTime();
  491.         synchronized (indexProductSync) {
  492.             final long afterEnterSync = new Date().getTime();

  493.             try {
  494.                 indexProduct(productSummary);
  495.             } finally {
  496.                 final long endIndex = new Date().getTime();
  497.                 LOGGER.fine("[" + getName() + "] indexer processed product id="
  498.                         + id.toString() + " in " +
  499.                         (endIndex - beginStore) + " ms"
  500.                         + " (" + (afterEnterSync - beforeEnterSync) + " ms sync delay)");
  501.             }
  502.         }
  503.     }

  504.     /**
  505.      * Stores a product
  506.      * @param product Product to store
  507.      * @param force if should skip already indexed check
  508.      * @return Product if stored, null if not
  509.      * @throws Exception if error occurs
  510.      */
  511.     public Product storeProduct(final Product product, final boolean force) throws Exception {
  512.         final ProductId id = product.getId();
  513.         final long beginStore = new Date().getTime();
  514.         try {
  515.             LOGGER.finest("[" + getName() + "] storing product id="
  516.                     + id.toString());
  517.             productStorage.storeProduct(product);
  518.             LOGGER.finest("[" + getName() + "] stored product id="
  519.                     + id.toString());
  520.         } catch (ProductAlreadyInStorageException paise) {
  521.             LOGGER.finer("["
  522.                     + getName()
  523.                     + "] product already in indexer storage, checking if indexed");
  524.             if (force) {
  525.                 LOGGER.finer("[" + getName()
  526.                         + "] force=true skipping check, (re)process product");
  527.             } else if (hasProductBeenIndexed(id)) {
  528.                 LOGGER.fine("[" + getName() + "] product already indexed "
  529.                         + product.getId());
  530.                 // don't reindex for now
  531.                 return null;
  532.             }
  533.         }
  534.         final long endStore = new Date().getTime();
  535.         LOGGER.fine("[" + getName() + "] indexer downloaded product id="
  536.                 + id.toString() + " in " +
  537.                 (endStore - beginStore) + " ms");
  538.         return product;
  539.     }

  540.     /**
  541.      * Use modules to summarize product.
  542.      * @param product To summarize
  543.      * @return A product summary
  544.      * @throws Exception if error occurs
  545.      */
  546.     public ProductSummary summarizeProduct(final Product product) throws Exception {
  547.         // Find best available indexer module
  548.         IndexerModule module = getModule(product);
  549.         return module.getProductSummary(product);
  550.     }

  551.     /**
  552.      * Add product summary to product index.
  553.      * @param productSummary to add
  554.      * @return Summary added to index
  555.      * @throws Exception if error occurs
  556.      */
  557.     protected synchronized ProductSummary indexProduct(
  558.             ProductSummary productSummary) throws Exception {
  559.         LOGGER.finest("[" + getName() + "] beginning index transaction");

  560.         // The notification to be sent when we are finished with this product
  561.         IndexerEvent notification = new IndexerEvent(this);
  562.         notification.setIndex(getProductIndex());
  563.         notification.setSummary(productSummary);

  564.         // Start the product index transaction, only proceed if able
  565.         productIndex.beginTransaction();

  566.         try {
  567.             LOGGER.finer("[" + getName() + "] finding previous version");
  568.             // Check index for previous version of this product
  569.             ProductSummary prevSummary = getPrevProductVersion(productSummary);

  570.             LOGGER.finer("[" + getName() + "] finding previous event");
  571.             Event prevEvent = null;
  572.             boolean redundantProduct = isRedundantProduct(prevSummary, productSummary);
  573.             if (!redundantProduct) {
  574.                 // Skip association queries and use existing product association
  575.                 // performed in next branch (should be associated already if
  576.                 // "redundant").

  577.                 // Check index for existing event candidate
  578.                 prevEvent = getPrevEvent(productSummary, true);
  579.             }

  580.             // may be an update/delete to a product that previously associated
  581.             // to an event, even though this product isn't associating on its
  582.             // own
  583.             if (prevSummary != null && prevEvent == null) {
  584.                 // see if prevSummary associated with an event
  585.                 ProductIndexQuery prevEventQuery = new ProductIndexQuery();
  586.                 prevEventQuery.getProductIds().add(prevSummary.getId());
  587.                 if (associateUsingCurrentProducts) {
  588.                     prevEventQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
  589.                 }
  590.                 List<Event> prevEvents = productIndex.getEvents(prevEventQuery);
  591.                 if (prevEvents.size() != 0) {
  592.                     // just use first (there can really only be one).
  593.                     prevEvent = prevEvents.get(0);
  594.                 }
  595.             }

  596.             // special handling to allow trump products to associate based on
  597.             // a product link. Not used when eventsource/eventsourcecode set.
  598.             if (prevEvent == null
  599.                     && productSummary.getId().getType().equals(TRUMP_PRODUCT_TYPE)
  600.                     && productSummary.getLinks().containsKey("product")
  601.                     && !productSummary.getStatus().equalsIgnoreCase(
  602.                             Product.STATUS_DELETE)) {
  603.                 // see if we can associate via another product
  604.                 ProductIndexQuery otherEventQuery = new ProductIndexQuery();
  605.                 otherEventQuery.getProductIds().add(
  606.                         ProductId.parse(productSummary.getLinks()
  607.                                 .get("product").get(0).toString()));
  608.                 if (associateUsingCurrentProducts) {
  609.                     otherEventQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
  610.                 }
  611.                 List<Event> prevEvents = productIndex
  612.                         .getEvents(otherEventQuery);
  613.                 if (prevEvents.size() != 0) {
  614.                     // just use first (there can really only be one).
  615.                     prevEvent = prevEvents.get(0);
  616.                 }
  617.             }

  618.             // Add the summary to the index
  619.             LOGGER.finer("[" + getName() + "] adding summary to index");
  620.             if (prevSummary != null && prevSummary.equals(productSummary)) {
  621.                 // implied force=true, prevEvent!=null

  622.                 // remove the previous version of this product summary
  623.                 // so the new one can take its place
  624.                 if (prevEvent != null) {
  625.                     productIndex.removeAssociation(prevEvent, prevSummary);
  626.                 } else {
  627.                     LOGGER.fine("[" + getName()
  628.                             + "] reprocessing unassociated summary");
  629.                 }
  630.                 productIndex.removeProductSummary(prevSummary);
  631.             }
  632.             productSummary = productIndex.addProductSummary(productSummary);

  633.             Event event = null;
  634.             if (prevEvent == null) {
  635.                 // No existing event, try to create one and associate
  636.                 event = createEvent(productSummary);
  637.                 if (event != null) {
  638.                     LOGGER.finer("[" + getName() + "] created event indexid="
  639.                             + event.getIndexId());
  640.                     event.log(LOGGER);
  641.                 } else {
  642.                     LOGGER.finer("[" + getName()
  643.                             + "] unable to create event for product.");
  644.                 }
  645.             } else {
  646.                 LOGGER.finer("[" + getName()
  647.                         + "] found existing event indexid="
  648.                         + prevEvent.getIndexId());
  649.                 prevEvent.log(LOGGER);

  650.                 // Existing event found associate to it
  651.                 event = productIndex.addAssociation(prevEvent, productSummary);
  652.             }

  653.             // Can't split or merge a non-existent event
  654.             if (prevEvent != null && event != null) {
  655.                 LOGGER.finer("[" + getName() + "] checking for event splits");
  656.                 // Check for event splits
  657.                 notification.addIndexerChanges(checkForEventSplits(
  658.                         productSummary, prevEvent, event));
  659.             }

  660.             // Is this a problem??? split may modify the event, and then
  661.             // the unmodified version of that event is passed to merge???
  662.             // If this is a problem, checkForEventSplits and checkForEventMerges
  663.             // could be modified to accept the notification object (and add
  664.             // changes to it) and return the potentially modified object by
  665.             // reference.

  666.             if (event != null) {
  667.                 LOGGER.finer("[" + getName() + "] checking for event merges");
  668.                 // Check for event merges
  669.                 notification.addIndexerChanges(checkForEventMerges(
  670.                         productSummary, prevEvent, event));
  671.             }

  672.             // see if this is a trump product that needs special processing.
  673.             event = checkForTrump(event, productSummary, prevSummary);

  674.             // Set our notification indexer changes if not set yet
  675.             if (notification.getIndexerChanges().size() == 0) {
  676.                 if (prevEvent == null && event != null) {
  677.                     // No previous event, so event added.
  678.                     notification.addIndexerChange(new IndexerChange(
  679.                             IndexerChange.EVENT_ADDED, prevEvent, event));
  680.                 } else if (prevEvent != null && event != null) {
  681.                     // Previous existed so event updated.
  682.                     notification.addIndexerChange(new IndexerChange(event
  683.                             .isDeleted() ? IndexerChange.EVENT_DELETED
  684.                             : IndexerChange.EVENT_UPDATED, prevEvent, event));
  685.                 } else if (prevEvent == null && event == null) {
  686.                     // No event existed or could be created.

  687.                     if (prevSummary == null) {
  688.                         // No previous summary, product added.
  689.                         notification.addIndexerChange(new IndexerChange(
  690.                                 IndexerChange.PRODUCT_ADDED, null, null));
  691.                     } else {
  692.                         // Previous summary existed. Product updated.
  693.                         notification
  694.                                 .addIndexerChange(new IndexerChange(
  695.                                         productSummary.isDeleted() ? IndexerChange.PRODUCT_DELETED
  696.                                                 : IndexerChange.PRODUCT_UPDATED,
  697.                                         null, null));
  698.                     }
  699.                 }
  700.             }

  701.             LOGGER.finer("[" + getName()
  702.                     + "] updating event summary parameters");
  703.             // update preferred event parameters in index
  704.             productIndex.eventsUpdated(notification.getEvents());

  705.             LOGGER.finer("[" + getName() + "] committing transaction");
  706.             // Commit our changes to the index (after updating summary attrs)
  707.             productIndex.commitTransaction();
  708.         } catch (Exception e) {
  709.             LOGGER.log(Level.FINE, "[" + getName() + "] rolling back transaction", e);
  710.             // just rollback since it wasn't successful
  711.             productIndex.rollbackTransaction();

  712.             // send heartbeat info
  713.             HeartbeatListener.sendHeartbeatMessage(getName(),
  714.                     "index exception", productSummary.getId().toString());
  715.             // send heartbeat info
  716.             HeartbeatListener.sendHeartbeatMessage(getName(),
  717.                     "index exception class", e.getClass().getName());

  718.             throw e;
  719.         }

  720.         try {
  721.             LOGGER.fine("[" + getName() + "] notifying listeners");
  722.             // ---------------------------------------------------------//
  723.             // -- Step 5: Notify listeners with our indexer event
  724.             // ---------------------------------------------------------//
  725.             notifyListeners(notification);
  726.         } catch (Exception e) {
  727.             // this doesn't affect success of index transaction...
  728.             LOGGER.log(Level.WARNING, "[" + getName()
  729.                     + "] exception while notifying listeners", e);
  730.         }

  731.         // send heartbeat info
  732.         HeartbeatListener.sendHeartbeatMessage(getName(),
  733.                 "indexed product", productSummary.getId().toString());

  734.         // return summary after added to index
  735.         return productSummary;
  736.     }

  737.     /**
  738.      * Check whether two products are redundant, meaning would not affect event
  739.      * associations and indexer can skip split/merge steps.
  740.      *
  741.      * @param previous previous version of product.
  742.      * @param current current version of product.
  743.      * @return true if products are equivalent for association purposes.
  744.      */
  745.     private boolean isRedundantProduct(final ProductSummary previous, final ProductSummary current) {
  746.         if (previous == null || previous.equals(current)
  747.                 || !previous.getId().isSameProduct(current.getId())) {
  748.             return false;
  749.         }
  750.         if (previous.getPreferredWeight() == current.getPreferredWeight()
  751.                 && CompareUtil.nullSafeCompare(previous.getStatus(),
  752.                         current.getStatus()) == 0
  753.                 && CompareUtil.nullSafeCompare(previous.getEventDepth(),
  754.                         current.getEventDepth()) == 0
  755.                 && CompareUtil.nullSafeCompare(previous.getEventLatitude(),
  756.                         current.getEventLatitude()) == 0
  757.                 && CompareUtil.nullSafeCompare(previous.getEventLongitude(),
  758.                         current.getEventLongitude()) == 0
  759.                 && CompareUtil.nullSafeCompare(previous.getEventMagnitude(),
  760.                         current.getEventMagnitude()) == 0
  761.                 && CompareUtil.nullSafeCompare(previous.getEventSource(),
  762.                         current.getEventSource()) == 0
  763.                 && CompareUtil.nullSafeCompare(previous.getEventSourceCode(),
  764.                         current.getEventSourceCode()) == 0
  765.                 && CompareUtil.nullSafeCompare(previous.getEventTime(),
  766.                         current.getEventTime()) == 0) {
  767.             // these are the properties that would influence indexer associations
  768.             // or preferred event properties.
  769.             return true;
  770.         }
  771.         return false;
  772.     }

  773.     /**
  774.      * Check for, and handle incoming trump products.
  775.      *
  776.      * Handles version specific trump products, calls
  777.      * {@link #checkForPersistentTrump(Event, ProductSummary, ProductSummary)}
  778.      * to handle "persistent" trump products.
  779.      *
  780.      * VERSION SPECIFIC TRUMP
  781.      *
  782.      * Version specific trump products include:
  783.      * - a link with relation "product" that is a product id urn.
  784.      * - a property "weight" that defines the new preferred weight.
  785.      *
  786.      * Finds the associated product, resummarizes. If trump is deleted, product
  787.      * is associated as is. If trump is not deleted, set's preferred weight
  788.      * before reassociating.
  789.      *
  790.      * Preconditions:
  791.      * <ul>
  792.      * <li>The "trump" type product must associate with the correct event on its
  793.      * own. The admin pages accomplish this by sending
  794.      * eventsource/eventsourcecode of the associated product. This means that no
  795.      * product without an eventsource/eventsourcecode property may be trumped.</li>
  796.      * </ul>
  797.      *
  798.      * @param event
  799.      *            current event being updated.
  800.      * @param productSummary
  801.      *            product summary associated with event.
  802.      * @return updated event, or same event if not updated.
  803.      * @throws Exception
  804.      * @see {@link #checkForPersistentTrump(Event, ProductSummary, ProductSummary)}
  805.      */
  806.     private Event checkForTrump(Event event, ProductSummary productSummary,
  807.             ProductSummary prevSummary) throws Exception {
  808.         if (event == null) {
  809.             return event;
  810.         }

  811.         String type = productSummary.getId().getType();
  812.         if (type.equals(TRUMP_PRODUCT_TYPE)) {
  813.             // version specific trump
  814.             ProductId trumpedId = null;
  815.             ProductSummary trumpedSummary = null;
  816.             if (productSummary.isDeleted()) {
  817.                 // deleting version specific trump
  818.                 // reset preferred weight of reference product
  819.                 // (is is a link in previous version of trump)
  820.                 trumpedId = getTrumpedProductId(prevSummary);
  821.                 if (trumpedId == null) {
  822.                     LOGGER.warning("Unable to process trump delete, "
  823.                             + "missing 'product' link from previous version");
  824.                     return event;
  825.                 }
  826.                 trumpedSummary = getProductSummaryById(trumpedId);
  827.                 if (trumpedSummary == null) {
  828.                     // no matching product in index, possibly already deleted
  829.                     return event;
  830.                 }
  831.                 // resummarize product
  832.                 event = resummarizeProduct(event, trumpedSummary);
  833.             } else {
  834.                 // updating product weight
  835.                 trumpedId = getTrumpedProductId(productSummary);
  836.                 Long weight = Long.valueOf(productSummary.getProperties().get(
  837.                         "weight"));
  838.                 if (trumpedId == null || weight == null) {
  839.                     LOGGER.warning("Invalid trump, "
  840.                             + "missing 'product' link or 'weight' property");
  841.                     return event;
  842.                 }
  843.                 trumpedSummary = getProductSummaryById(trumpedId);
  844.                 if (trumpedSummary == null) {
  845.                     // no matching product in index, possibly already deleted
  846.                     LOGGER.info("Unable to process trump, " + "product '"
  847.                             + trumpedId.toString() + "' not found");
  848.                     return event;
  849.                 }
  850.                 event = setSummaryWeight(event, trumpedSummary, weight);
  851.             }
  852.         } else {
  853.             return checkForPersistentTrump(event, productSummary);
  854.         }

  855.         return event;
  856.     }

  857.     /**
  858.      * Check for, and handle persistent trump products.
  859.      *
  860.      * PERSISTENT TRUMP
  861.      *
  862.      * Persistent trump products include:
  863.      * - a type "trump-PRODUCT" where PRODUCT is the type of product receiving
  864.      * trump.
  865.      * - a property "trump-source" that is the source of product receiving trump.
  866.      * - a property "trump-code" that is the code of product receiving trump.
  867.      *
  868.      * Steps:
  869.      *
  870.      * 1) Find preferred persistent trump product for product being associated
  871.      *      (may be a persistent trump product being associated)
  872.      * 2) If a non-trump product being associated,
  873.      *      stop processing if not affected by trump.
  874.      * 3) set TRUMP_PREFERRED_WEIGHT on product referenced by preferred
  875.      *      persistent trump product; resummarize any other product that has
  876.      *      TRUMP_PREFERRED_WEIGHT.
  877.      *
  878.      * Preconditions:
  879.      * <ul>
  880.      * <li>The "trump" type product must associate with the correct event on its
  881.      * own. The admin pages accomplish this by sending
  882.      * eventsource/eventsourcecode of the associated product.</li>
  883.      * </ul>
  884.      *
  885.      * @param event
  886.      *            current event being updated.
  887.      * @param productSummary
  888.      *            product summary associated with event.
  889.      * @return updated event, or same event if not updated.
  890.      * @throws Exception
  891.      */
  892.     private Event checkForPersistentTrump(final Event event,
  893.             final ProductSummary productSummary) throws Exception {
  894.         Event updatedEvent = event;

  895.         // the type of product currently being indexed
  896.         String type = productSummary.getType();
  897.         // the "trump-TYPE" product type
  898.         String persistentTrumpType = null;
  899.         // whether productSummary is a persistent trump product
  900.         boolean associatingTrump = false;
  901.         // the product source receiving trump
  902.         String trumpSource = null;
  903.         // the product type receiving trump
  904.         String trumpType = null;
  905.         // the product code receiving trump
  906.         String trumpCode = null;

  907.         // determine persistentTrumpType and trumpType
  908.         if (type.startsWith(PERSISTENT_TRUMP_PREFIX)) {
  909.             // incoming trump product
  910.             persistentTrumpType = type;
  911.             trumpType = type.replace(PERSISTENT_TRUMP_PREFIX, "");
  912.             associatingTrump = true;
  913.             // always set persistent trump preferred weight to 1,
  914.             // so most recent updateTime is most preferred
  915.             updatedEvent = setSummaryWeight(updatedEvent, productSummary, 1L);
  916.         } else {
  917.             // incoming product, possibly affected by existing trump
  918.             persistentTrumpType = PERSISTENT_TRUMP_PREFIX + type;
  919.             trumpType = type;
  920.         }

  921.         // find active persistent trump product for type
  922.         ProductSummary persistentTrump = updatedEvent.getPreferredProduct(
  923.                 persistentTrumpType);
  924.         if (persistentTrump != null) {
  925.             trumpSource = persistentTrump.getProperties().get("trump-source");
  926.             trumpCode = persistentTrump.getProperties().get("trump-code");
  927.         }

  928.         // if a non-trump product is coming in,
  929.         // only continue processing if it is affected by persistentTrump.
  930.         // (otherwise weights should already be set)
  931.         if (!associatingTrump &&
  932.                 !(productSummary.getSource().equals(trumpSource)
  933.                 && productSummary.getCode().equals(trumpCode))) {
  934.             // not affected by trump
  935.             return event;
  936.         }

  937.         // update products affected by trump
  938.         List<ProductSummary> products = updatedEvent.getProducts(trumpType);
  939.         if (products != null) {
  940.             for (ProductSummary summary : products) {
  941.                 if (summary.getSource().equals(trumpSource)
  942.                         && summary.getCode().equals(trumpCode)) {
  943.                     // add trump to product
  944.                     updatedEvent = setSummaryWeight(updatedEvent, summary,
  945.                             TRUMP_PREFERRED_WEIGHT);
  946.                 } else if (summary.getPreferredWeight() == TRUMP_PREFERRED_WEIGHT) {
  947.                     // remove trump from previously trumped product.
  948.                     updatedEvent = resummarizeProduct(updatedEvent, summary);
  949.                 }
  950.             }
  951.         }
  952.         // return updated event
  953.         return updatedEvent;
  954.     }

  955.     /**
  956.      * Get the productId referred to by a trump product.
  957.      *
  958.      * @param trumpSummary
  959.      *            trump product with reference to product id.
  960.      * @return product id, or null if unable to parse product id.
  961.      */
  962.     protected ProductId getTrumpedProductId(final ProductSummary trumpSummary) {
  963.         try {
  964.             // use 'product' link from previous summary
  965.             ProductId trumpedId = ProductId.parse(trumpSummary.getLinks().get(
  966.                     "product").get(0).toString());
  967.             return trumpedId;
  968.         } catch (Exception e) {
  969.             return null;
  970.         }
  971.     }

  972.     /**
  973.      * Get a product summary object using its product id.
  974.      *
  975.      * @param id
  976.      *            id to find.
  977.      * @return matching product summary or null.
  978.      * @throws Exception if error occurs
  979.      */
  980.     protected ProductSummary getProductSummaryById(final ProductId id)
  981.             throws Exception {
  982.         ProductIndexQuery query = new ProductIndexQuery();
  983.         query.getProductIds().add(id);
  984.         List<ProductSummary> summaries = productIndex.getProducts(query);
  985.         if (summaries.size() > 0) {
  986.             return summaries.get(0);
  987.         }
  988.         return null;
  989.     }

  990.     /**
  991.      * Update a product summary weight
  992.      *
  993.      * @param event
  994.      *            the event.
  995.      * @param summary
  996.      *            the summary.
  997.      * @param preferredWeight
  998.      *            the weight to set.
  999.      * @return event with updated summary.
  1000.      * @throws Exception if error occurs
  1001.      */
  1002.     protected Event setSummaryWeight(Event event, ProductSummary summary,
  1003.             final Long preferredWeight) throws Exception {
  1004.         if (summary.getPreferredWeight() == preferredWeight) {
  1005.             // already set
  1006.             return event;
  1007.         }

  1008.         LOGGER.info("Setting product preferred weight "
  1009.                 + summary.getId().toString() + ", weight "
  1010.                 + summary.getPreferredWeight() + " (old) => " + preferredWeight
  1011.                 + " (new)");

  1012.         // remove existing summary from event
  1013.         event = productIndex.removeAssociation(event, summary);
  1014.         productIndex.removeProductSummary(summary);
  1015.         // set custom weight
  1016.         summary.setPreferredWeight(preferredWeight);
  1017.         // add updated summary to event
  1018.         summary = productIndex.addProductSummary(summary);
  1019.         event = productIndex.addAssociation(event, summary);
  1020.         // return updated event
  1021.         return event;
  1022.     }

  1023.     /**
  1024.      * Resummarize a product within an event.
  1025.      *
  1026.      * @param event
  1027.      *            the event.
  1028.      * @param summary
  1029.      *            the summary.
  1030.      * @return event with updated summary.
  1031.      * @throws Exception if error occurs
  1032.      */
  1033.     protected Event resummarizeProduct(final Event event,
  1034.             final ProductSummary summary) throws Exception {
  1035.         Event updatedEvent = null;
  1036.         ProductSummary updatedSummary = null;
  1037.         // remove existing summary from event
  1038.         updatedEvent = productIndex.removeAssociation(event, summary);
  1039.         productIndex.removeProductSummary(summary);
  1040.         // use module to summarize original product
  1041.         Product product = productStorage.getProduct(summary.getId());
  1042.         if (product == null) {
  1043.             throw new Exception("Unable to resummarize product, "
  1044.                     + "product not in storage " + summary.getId().toString());
  1045.         }
  1046.         updatedSummary = getModule(product).getProductSummary(product);
  1047.         LOGGER.info("Resummarizing product " + summary.getId().toString()
  1048.                 + ", weight " + summary.getPreferredWeight() + " (old) => "
  1049.                 + updatedSummary.getPreferredWeight() + " (new)");
  1050.         // add updated summary to event
  1051.         updatedSummary = productIndex.addProductSummary(updatedSummary);
  1052.         updatedEvent = productIndex
  1053.                 .addAssociation(updatedEvent, updatedSummary);
  1054.         // return updated event
  1055.         return updatedEvent;
  1056.     }

  1057.     /**
  1058.      * Check for event splits (and split them if needed).
  1059.      *
  1060.      * @param summary
  1061.      *            the summary the indexer is currently processing.
  1062.      * @param originalEvent
  1063.      *            the event before the indexer made any changes.
  1064.      * @param updatedEvent
  1065.      *            the event after the indexer made any changes.
  1066.      * @return List of changes made during this method.
  1067.      * @throws Exception if error occurs
  1068.      */
  1069.     protected synchronized List<IndexerChange> checkForEventSplits(
  1070.             final ProductSummary summary, final Event originalEvent,
  1071.             final Event updatedEvent) throws Exception {
  1072.         List<IndexerChange> changes = new ArrayList<IndexerChange>();

  1073.         // save reference so we can check later if this has changed
  1074.         Event splitEvent = updatedEvent;

  1075.         // ## 1) Split event into sub events
  1076.         Map<String, Event> subEvents = splitEvent.getSubEvents();
  1077.         if (subEvents.size() == 1) {
  1078.             // still only one event, cannot split
  1079.             return changes;
  1080.         }

  1081.         // the original eventid before the indexer started processing this
  1082.         // update (may have already changed in updatedEvent).
  1083.         String originalEventId = originalEvent.getEventId();

  1084.         // ## 2) See if sub events still associate
  1085.         // list of events that actually are split
  1086.         List<Event> alreadySplit = new ArrayList<Event>();

  1087.         // see how sub events associate compared to this event (since the
  1088.         // original event is the one that should be "UPDATED")
  1089.         Event originalSubEvent = subEvents.remove(originalEventId);
  1090.         if (originalSubEvent == null) {
  1091.             // wtf
  1092.             // this should always exist because its ID was returned by getEventId,
  1093.             // should should have at least one product.  Log:
  1094.             LOGGER.warning("[" + getName() + "] originalSubEvent is null"
  1095.                     + ", originalEventId=" + originalEventId);
  1096.             for (final String id: subEvents.keySet()) {
  1097.                 final Event subEvent = subEvents.get(id);
  1098.                 subEvent.log(LOGGER);
  1099.             }
  1100.             return changes;
  1101.         }

  1102.         Iterator<String> subEventsIter = new ArrayList<String>(
  1103.                 subEvents.keySet()).iterator();
  1104.         while (subEventsIter.hasNext()) {
  1105.             String nextEventId = subEventsIter.next();
  1106.             Event nextEvent = subEvents.get(nextEventId);

  1107.             if (!originalSubEvent.isAssociated(nextEvent, associator)) {
  1108.                 // not associated, so split
  1109.                 splitEvent = splitEvents(splitEvent, nextEvent);

  1110.                 // see if associated to any that already split
  1111.                 Iterator<Event> alreadySplitIter = alreadySplit.iterator();
  1112.                 while (alreadySplitIter.hasNext()) {
  1113.                     Event alreadySplitEvent = alreadySplitIter.next();
  1114.                     if (alreadySplitEvent.isAssociated(nextEvent, associator)) {
  1115.                         // need to merge with event that already split

  1116.                         // will need reference to alreadySplitEvent, so keep
  1117.                         // reference to merged event in nextEvent
  1118.                         nextEvent = mergeEvents(alreadySplitEvent, nextEvent);
  1119.                         // remove original already split
  1120.                         alreadySplit.remove(alreadySplitEvent);
  1121.                         // add merged nextEvent
  1122.                         alreadySplit.add(nextEvent);
  1123.                         // signal that nextEvent was already added to
  1124.                         // alreadySplit
  1125.                         nextEvent = null;
  1126.                         // associated, and one at a time, so stop checking
  1127.                         break;
  1128.                     }
  1129.                 }

  1130.                 if (nextEvent != null) {
  1131.                     // wasn't merged with an already split event
  1132.                     alreadySplit.add(nextEvent);
  1133.                 }
  1134.             }
  1135.         }
  1136.         if (alreadySplit.size() == 0) {
  1137.             // didn't split any events...
  1138.             return changes;
  1139.         }

  1140.         // ## 3) Build list of Indexer changes that actually happened.
  1141.         String splitEventId = splitEvent.getEventId();

  1142.         if (!originalEventId.equalsIgnoreCase(splitEventId)) {
  1143.             LOGGER.warning("[" + getName() + "] eventid (" + splitEventId
  1144.                     + ") no longer matches original (" + originalEventId
  1145.                     + ") after split.");
  1146.         }

  1147.         // first notify about updated original event
  1148.         changes.add(new IndexerChange(
  1149.                 (splitEvent.isDeleted() ? IndexerChange.EVENT_DELETED
  1150.                         : IndexerChange.EVENT_UPDATED), originalEvent,
  1151.                 splitEvent));

  1152.         // now notify about all events that split from original event
  1153.         Iterator<Event> alreadySplitIter = alreadySplit.iterator();
  1154.         while (alreadySplitIter.hasNext()) {
  1155.             Event alreadySplitEvent = alreadySplitIter.next();
  1156.             changes.add(new IndexerChange(IndexerChange.EVENT_SPLIT, null,
  1157.                     alreadySplitEvent));
  1158.         }

  1159.         // done
  1160.         return changes;
  1161.     }

  1162.     /**
  1163.      * Removes the leaf event (and all its products) from the root event. This
  1164.      * method modifies the runtime objects as well as updating the index DB.
  1165.      *
  1166.      * @param root
  1167.      *            The root event from which all leaf products will be removed
  1168.      * @param leaf
  1169.      *            The event (with products) that will be removed from the root
  1170.      * @return copy of root without the products that have been removed. The
  1171.      *         indexId property of leaf is updated to its new value.
  1172.      * @throws Exception if error occurs
  1173.      */
  1174.     protected synchronized Event splitEvents(final Event root, final Event leaf)
  1175.             throws Exception {
  1176.         Event updated = root;
  1177.         Iterator<ProductSummary> leafProducts = leaf.getProductList()
  1178.                 .iterator();

  1179.         // assign leaf indexId by reference
  1180.         Event insertedLeafEvent = productIndex.addEvent(leaf);
  1181.         leaf.setIndexId(insertedLeafEvent.getIndexId());

  1182.         while (leafProducts.hasNext()) {
  1183.             ProductSummary product = leafProducts.next();
  1184.             if (updated != null) {
  1185.                 updated = productIndex.removeAssociation(updated, product);
  1186.             }
  1187.             // leaf already has the product in its list, not returning anyways.
  1188.             productIndex.addAssociation(leaf, product);
  1189.         }

  1190.         return updated;
  1191.     }

  1192.     /**
  1193.      * Merges the child event (and all its products) into the target event. If
  1194.      * the child event attempts to merge in a product that is the same as one
  1195.      * already associated to the target event, the child version of the product
  1196.      * takes precedence. Note: This only applies when the target and child
  1197.      * product have the same type, code, source, and update time; i.e. the
  1198.      * products are duplicates. This method modifies the runtime objects as well
  1199.      * as the index DB. The child event is then deleted.
  1200.      *
  1201.      * @param target
  1202.      *            The target event into which the child is merged.
  1203.      * @param child
  1204.      *            The child event to be merged into the target.
  1205.      * @return the updated event
  1206.      * @throws Exception if error occurs
  1207.      */
  1208.     protected synchronized Event mergeEvents(final Event target,
  1209.             final Event child) throws Exception {
  1210.         Iterator<ProductSummary> childProducts = child.getProductList()
  1211.                 .iterator();
  1212.         Event updatedEvent = target;
  1213.         Event updatedChild = child;

  1214.         while (childProducts.hasNext()) {
  1215.             ProductSummary product = childProducts.next();
  1216.             productIndex.removeAssociation(child, product);
  1217.             updatedChild = productIndex
  1218.                     .removeAssociation(updatedChild, product);
  1219.             updatedEvent = productIndex.addAssociation(updatedEvent, product);
  1220.         }

  1221.         productIndex.removeEvent(updatedChild);

  1222.         return updatedEvent;
  1223.     }

  1224.     /**
  1225.      * Check and merge any nearby events or previously unassociated products
  1226.      * that now associate.
  1227.      *
  1228.      * @param summary
  1229.      *            the summary currently being processed by the indexer.
  1230.      * @param originalEvent
  1231.      *            the event before any changes.
  1232.      * @param updatedEvent
  1233.      *            the event after the summary was associated.
  1234.      * @return list of any merge type changes.
  1235.      * @throws Exception if error occurs
  1236.      */
  1237.     protected synchronized List<IndexerChange> checkForEventMerges(
  1238.             final ProductSummary summary, final Event originalEvent,
  1239.             final Event updatedEvent) throws Exception {
  1240.         List<IndexerChange> changes = new ArrayList<IndexerChange>();
  1241.         Event mergedEvent = updatedEvent;

  1242.         // ## 1) Check for nearby events
  1243.         if (originalEvent != null) {
  1244.             // only if the event was not just created, because otherwise this
  1245.             // product would have associated to an existing event

  1246.             // build the query
  1247.             EventSummary mergedSummary = mergedEvent.getEventSummary();
  1248.             ProductIndexQuery nearbyEvents = associator.getLocationQuery(
  1249.                     mergedSummary.getTime(), mergedSummary.getLatitude(),
  1250.                     mergedSummary.getLongitude());
  1251.             if (associateUsingCurrentProducts && nearbyEvents != null) {
  1252.                 nearbyEvents.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
  1253.             }

  1254.             LOGGER.finer("[" + getName() + "] searching for nearby events");
  1255.             // do the search
  1256.             Iterator<Event> events = productIndex.getEvents(nearbyEvents)
  1257.                     .iterator();
  1258.             LOGGER.finer("[" + getName()
  1259.                     + "] search for nearby events complete");
  1260.             while (events.hasNext()) {
  1261.                 Event foundEvent = events.next();
  1262.                 if (foundEvent.getIndexId().equals(mergedEvent.getIndexId())) {
  1263.                     // found the event currently being checked for merges,
  1264.                     // ignore
  1265.                     continue;
  1266.                 } else if (mergedEvent.isAssociated(foundEvent, associator)) {
  1267.                     // event associates to another event, merge them
  1268.                     mergedEvent = mergeEvents(mergedEvent, foundEvent);
  1269.                     changes.add(new IndexerChange(IndexerChange.EVENT_MERGED,
  1270.                             foundEvent, null));
  1271.                 }
  1272.             }
  1273.         }

  1274.         // ## 2) Now look for products that were previously unassociated, but
  1275.         // that can now associate because of the event id of the incoming
  1276.         // product
  1277.         // (if the event already had this id, these products would already be
  1278.         // associated...)
  1279.         String source = summary.getEventSource();
  1280.         String sourceCode = summary.getEventSourceCode();
  1281.         // without this check, all unassociated products would be added...(BAD)
  1282.         if (source != null && sourceCode != null) {
  1283.             // build the query
  1284.             ProductIndexQuery unassociatedProducts = new ProductIndexQuery();
  1285.             unassociatedProducts.setEventSource(source);
  1286.             unassociatedProducts.setEventSourceCode(sourceCode);

  1287.             // run the query
  1288.             LOGGER.finer("[" + getName()
  1289.                     + "] searching for unassociated products");
  1290.             Iterator<ProductSummary> summaries = productIndex
  1291.                     .getUnassociatedProducts(unassociatedProducts).iterator();
  1292.             LOGGER.finer("[" + getName()
  1293.                     + "] search for unassociated products complete");
  1294.             // add associations
  1295.             while (summaries.hasNext()) {
  1296.                 mergedEvent = productIndex.addAssociation(mergedEvent,
  1297.                         summaries.next());
  1298.             }
  1299.         }

  1300.         // ## 2.5) Check for merge by associate product
  1301.         // only need to check when associate product is first added
  1302.         // THIS IMPLEMENTATION ASSUMES: both events exist when associate product
  1303.         // is sent. Search for existing event (during getPrevEvent) does not
  1304.         // search associate products othereventsource or othereventsourcecode
  1305.         // properties.
  1306.         if (summary.getType().equals(Event.ASSOCIATE_PRODUCT_TYPE)
  1307.                 && !summary.isDeleted()) {
  1308.             String otherEventSource = summary.getProperties().get(
  1309.                     Event.OTHEREVENTSOURCE_PROPERTY);
  1310.             String otherEventSourceCode = summary.getProperties().get(
  1311.                     Event.OTHEREVENTSOURCECODE_PROPERTY);

  1312.             if (otherEventSource == null || otherEventSourceCode == null) {
  1313.                 LOGGER.warning(Event.ASSOCIATE_PRODUCT_TYPE
  1314.                         + " product without " + Event.OTHEREVENTSOURCE_PROPERTY
  1315.                         + " or " + Event.OTHEREVENTSOURCECODE_PROPERTY
  1316.                         + " properties, ignoring");
  1317.             } else {
  1318.                 // search for associated event
  1319.                 ProductIndexQuery associateQuery = new ProductIndexQuery();
  1320.                 associateQuery.setEventSource(otherEventSource);
  1321.                 associateQuery.setEventSourceCode(otherEventSourceCode);

  1322.                 if (associateUsingCurrentProducts) {
  1323.                     associateQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
  1324.                 }

  1325.                 LOGGER.finer("[" + getName()
  1326.                         + "] searching for associated event");
  1327.                 // do the search
  1328.                 Iterator<Event> events = productIndex.getEvents(associateQuery)
  1329.                         .iterator();
  1330.                 LOGGER.finer("[" + getName()
  1331.                         + "] search for associated event complete");
  1332.                 while (events.hasNext()) {
  1333.                     Event foundEvent = events.next();
  1334.                     if (foundEvent.getIndexId()
  1335.                             .equals(mergedEvent.getIndexId())) {
  1336.                         // found the event currently being checked for merges,
  1337.                         // ignore
  1338.                         continue;
  1339.                     } else if (mergedEvent.isAssociated(foundEvent)) {
  1340.                         // event associates to another event, merge them
  1341.                         mergedEvent = mergeEvents(mergedEvent, foundEvent);
  1342.                         changes.add(new IndexerChange(
  1343.                                 IndexerChange.EVENT_MERGED, foundEvent, null));
  1344.                     }
  1345.                 }
  1346.             }
  1347.         }

  1348.         // ## 4) Check if the event has changed during this method
  1349.         if (mergedEvent != updatedEvent) {
  1350.             // something has changed, so add an IndexerChange
  1351.             if (originalEvent == null) {
  1352.                 // no previous event, it was added (although unassociated
  1353.                 // products were associated)
  1354.                 changes.add(new IndexerChange(IndexerChange.EVENT_ADDED, null,
  1355.                         mergedEvent));
  1356.             } else {
  1357.                 // may have merged with other events, or associated unassociated
  1358.                 // products. Other changes represent the merges, so just
  1359.                 // indicate update/delete.
  1360.                 changes.add(new IndexerChange(
  1361.                         (mergedEvent.isDeleted() ? IndexerChange.EVENT_DELETED
  1362.                                 : IndexerChange.EVENT_UPDATED), originalEvent,
  1363.                         mergedEvent));
  1364.             }
  1365.         }

  1366.         return changes;
  1367.     }

  1368.     /**
  1369.      * Takes a summary return the previous
  1370.      * @param summary A product summary
  1371.      * @return The previous summary
  1372.      * @throws Exception if error occurs
  1373.      */
  1374.     protected synchronized ProductSummary getPrevProductVersion(
  1375.             ProductSummary summary) throws Exception {
  1376.         ProductSummary prevSummary = null;
  1377.         List<ProductSummary> candidateSummaries = null;
  1378.         ProductIndexQuery query = new ProductIndexQuery();

  1379.         // Set type, code and source
  1380.         query.setProductType(summary.getType());
  1381.         query.setProductCode(summary.getCode());
  1382.         query.setProductSource(summary.getSource());

  1383.         // Query the index (first look for associated products)
  1384.         candidateSummaries = productIndex.getProducts(query);

  1385.         if (candidateSummaries == null || candidateSummaries.size() == 0) {
  1386.             // No summaries found associated to events, try unassociated.
  1387.             candidateSummaries = productIndex.getUnassociatedProducts(query);
  1388.         }

  1389.         if (candidateSummaries != null && candidateSummaries.size() > 0) {
  1390.             prevSummary = candidateSummaries.get(0);
  1391.             if (candidateSummaries.size() != 1) {
  1392.                 LOGGER.warning(
  1393.                         "[" + getName() + "] " + summary.getId().toString() +
  1394.                         ": More than one existing summary is claiming to be most recent.");
  1395.             }
  1396.         }
  1397.         return prevSummary;
  1398.     }

  1399.     /**
  1400.      * Associate products are processed during
  1401.      * {@link #checkForEventMerges(ProductSummary, Event, Event)} and are
  1402.      * ignored during this method.
  1403.      *
  1404.      * @see Associator#getSearchRequest(ProductSummary)
  1405.      * @see Associator#chooseEvent(List, ProductSummary)
  1406.      *
  1407.      * @param summary ProductSummary
  1408.      * @return Event to which a productSummary is associated, or null if not
  1409.      *         found.
  1410.      * @throws Exception if error occurs
  1411.      */
  1412.     protected synchronized Event getPrevEvent(ProductSummary summary)
  1413.             throws Exception {
  1414.         return getPrevEvent(summary, false);
  1415.     }

  1416.     /**
  1417.      * Find an existing event that summary should associate with.
  1418.      *
  1419.      * @param summary the previous event.
  1420.      * @param associating whether associating (vs archiving).
  1421.      * @return previous event, or null if none found.
  1422.      * @throws Exception if error occurs
  1423.      */
  1424.     protected synchronized Event getPrevEvent(ProductSummary summary,
  1425.             boolean associating) throws Exception {
  1426.         Event prevEvent = null;
  1427.         List<Event> candidateEvents = null;

  1428.         SearchRequest request = associator.getSearchRequest(summary);

  1429.         if (associating && associateUsingCurrentProducts) {
  1430.             for (SearchQuery query : request.getQueries()) {
  1431.                 query.getProductIndexQuery().setResultType(
  1432.                         ProductIndexQuery.RESULT_TYPE_CURRENT);
  1433.             }
  1434.         }

  1435.         SearchResponse response = search(request);
  1436.         if (response != null) {
  1437.             candidateEvents = response.getEvents();
  1438.         }

  1439.         if (candidateEvents != null && candidateEvents.size() > 0) {
  1440.             // Found some events. Find best match.
  1441.             prevEvent = associator.chooseEvent(candidateEvents, summary);
  1442.         }

  1443.         return prevEvent;
  1444.     }

  1445.     /*
  1446.      * protected IndexerEvent createIndexerEvent(ProductSummary prevSummary,
  1447.      * Event prevEvent, ProductSummary summary, Event event) { IndexerType type
  1448.      * = null; IndexerEvent indexerEvent = new IndexerEvent(this);
  1449.      *
  1450.      * // ---------------------------------- // Determine the type if
  1451.      * IndexerEvent // ----------------------------------
  1452.      *
  1453.      * if (summary.getStatus() == Product.STATUS_DELETE) { type =
  1454.      * IndexerEvent.PRODUCT_DELETED; if (event != null) { // Since we have an
  1455.      * event, this is now an EVENT_UPDATED type type =
  1456.      * IndexerEvent.EVENT_UPDATED;
  1457.      *
  1458.      * // Check if all products on event are deleted. if
  1459.      * (event.getProductList().size() == 0) { type = IndexerEvent.EVENT_DELETED;
  1460.      * } } } else { // Product was not a "DELETE" status. Must be an added or
  1461.      * updated. if (prevEvent == null && event != null) { type =
  1462.      * IndexerEvent.EVENT_ADDED; } else if (prevEvent != null && event != null)
  1463.      * { type = IndexerEvent.EVENT_UPDATED; } else if (prevSummary == null &&
  1464.      * summary != null) { type = IndexerEvent.PRODUCT_ADDED; } else if
  1465.      * (prevSummary != null && summary != null) { type =
  1466.      * IndexerEvent.PRODUCT_UPDATED; }
  1467.      *
  1468.      * if (summary == null) { // Not sure how this happens.
  1469.      * LOGGER.warning("Trying to notify of a null summary."); } }
  1470.      *
  1471.      * // Set parameters indexerEvent.setEventType(type);
  1472.      * indexerEvent.setOldEvent(prevEvent); indexerEvent.setSummary(summary);
  1473.      * indexerEvent.setEvent(event);
  1474.      *
  1475.      * return indexerEvent; }
  1476.      */
  1477.     /**
  1478.      * Loads parent, specific, and dependent configurations; in that order.
  1479.      */
  1480.     @Override
  1481.     public synchronized void configure(Config config) throws Exception {
  1482.         // -- Load parent configurations -- //
  1483.         super.configure(config);

  1484.         // reads properties from same config section
  1485.         defaultModule.getSignatureVerifier().configure(config);

  1486.         // -- Load specific configurations -- //
  1487.         String associatorName = config.getProperty(ASSOCIATOR_CONFIG_PROPERTY);
  1488.         if (associatorName != null) {
  1489.             associator = (Associator) Config.getConfig().getObject(
  1490.                     associatorName);
  1491.         }

  1492.         String storageName = config.getProperty(STORAGE_CONFIG_PROPERTY);
  1493.         String storageDirectory = config
  1494.                 .getProperty(STORAGE_DIRECTORY_CONFIG_PROPERTY);
  1495.         if (storageName != null) {
  1496.             LOGGER.config("[" + getName() + "] loading ProductStorage '"
  1497.                     + storageName + "'");
  1498.             productStorage = (ProductStorage) Config.getConfig().getObject(
  1499.                     storageName);
  1500.             if (productStorage == null) {
  1501.                 throw new ConfigurationException("[" + getName()
  1502.                         + "] ProductStorage '" + storageName
  1503.                         + "' is not properly configured");
  1504.             }
  1505.         } else if (storageDirectory != null) {
  1506.             LOGGER.config("[" + getName() + "] using storage directory '"
  1507.                     + storageDirectory + "'");
  1508.             productStorage = new FileProductStorage(new File(storageDirectory));
  1509.         } else {
  1510.             productStorage.configure(config);
  1511.         }

  1512.         String indexName = config.getProperty(INDEX_CONFIG_PROPERTY);
  1513.         String indexFileName = config.getProperty(INDEXFILE_CONFIG_PROPERTY);
  1514.         if (indexName != null) {
  1515.             LOGGER.config("[" + getName() + "] loading ProductIndex '"
  1516.                     + indexName + "'");
  1517.             productIndex = (ProductIndex) Config.getConfig().getObject(
  1518.                     indexName);
  1519.             if (productIndex == null) {
  1520.                 throw new ConfigurationException("[" + getName()
  1521.                         + "] ProductIndex '" + indexName
  1522.                         + "' is not properly configured");
  1523.             }
  1524.         } else if (indexFileName != null) {
  1525.             LOGGER.config("[" + getName() + "] using sqlite product index '"
  1526.                     + indexFileName + "'");
  1527.             productIndex = new JDBCProductIndex(indexFileName);
  1528.         } else {
  1529.             productIndex.configure(config);
  1530.         }

  1531.         // How often to check for expired products
  1532.         String archivePolicy = config
  1533.                 .getProperty(INDEX_ARCHIVE_POLICY_PROPERTY);
  1534.         if (archivePolicy != null) {
  1535.             Iterator<String> iter = StringUtils.split(archivePolicy, ",")
  1536.                     .iterator();
  1537.             while (iter.hasNext()) {
  1538.                 String policyName = iter.next();

  1539.                 LOGGER.config("[" + getName() + "] loading ArchivePolicy '"
  1540.                         + policyName + "'");
  1541.                 ArchivePolicy policy = (ArchivePolicy) Config.getConfig()
  1542.                         .getObject(policyName);
  1543.                 if (policy == null) {
  1544.                     throw new ConfigurationException("[" + getName()
  1545.                             + "] ArchivePolicy '" + policyName
  1546.                             + "' is not configured properly");
  1547.                 }

  1548.                 // Only use archive policies that are valid
  1549.                 if (policy.isValidPolicy()) {
  1550.                     archivePolicies.add(policy);
  1551.                 } else {
  1552.                     LOGGER.warning("[" + getName() + "] ArchivePolicy '"
  1553.                             + policyName + "' is not valid");
  1554.                 }
  1555.             }
  1556.         }

  1557.         // How often should the archive policies be run
  1558.         String buffer = config.getProperty(INDEX_ARCHIVE_INTERVAL_PROPERTY);
  1559.         if (buffer != null) {
  1560.             archiveInterval = Long.parseLong(buffer);
  1561.         } else {
  1562.             // Use default age
  1563.             archiveInterval = INDEX_ARCHIVE_INTERVAL_DEFAULT;
  1564.         }
  1565.         LOGGER.config("[" + getName() + "] archive interval is '"
  1566.                 + archiveInterval + "'");

  1567.         // Always use at least a default indexer module
  1568.         String moduleNames = config.getProperty(MODULES_CONFIG_PROPERTY);
  1569.         if (moduleNames != null) {
  1570.             Iterator<String> modules = StringUtils.split(moduleNames, ",")
  1571.                     .iterator();
  1572.             while (modules.hasNext()) {
  1573.                 String moduleName = modules.next();
  1574.                 if (moduleName.equals("")) {
  1575.                     continue;
  1576.                 }
  1577.                 LOGGER.config("[" + getName() + "] loading indexer module '"
  1578.                         + moduleName + "'");
  1579.                 IndexerModule module = (IndexerModule) Config.getConfig()
  1580.                         .getObject(moduleName);
  1581.                 if (module == null) {
  1582.                     throw new ConfigurationException("[" + getName()
  1583.                             + "] indexer module '" + moduleName
  1584.                             + "' is not configured properly");
  1585.                 }
  1586.                 addModule(module);
  1587.             }
  1588.         } else {
  1589.             LOGGER.config("[" + getName() + "] no indexer modules configured.");
  1590.         }

  1591.         String listenerNames = config.getProperty(LISTENERS_CONFIG_PROPERTY);
  1592.         if (listenerNames != null) {
  1593.             Iterator<String> listeners = StringUtils.split(listenerNames, ",")
  1594.                     .iterator();
  1595.             while (listeners.hasNext()) {
  1596.                 String listenerName = listeners.next();
  1597.                 if (listenerName.equals("")) {
  1598.                     continue;
  1599.                 }
  1600.                 LOGGER.config("[" + getName() + "] loading indexer listener '"
  1601.                         + listenerName + "'");
  1602.                 IndexerListener listener = (IndexerListener) Config.getConfig()
  1603.                         .getObject(listenerName);
  1604.                 if (listener == null) {
  1605.                     throw new ConfigurationException("[" + getName()
  1606.                             + "] indexer listener '" + listenerName
  1607.                             + "' is not configured properly");
  1608.                 }
  1609.                 addListener(listener);
  1610.             }
  1611.         } else {
  1612.             LOGGER.config("[" + getName()
  1613.                     + "] no indexer listeners configured.");
  1614.         }

  1615.         String localRegions = config.getProperty(LOCAL_REGIONS_PROPERTY,
  1616.                 DEFAULT_LOCAL_REGIONS);
  1617.         this.localRegionsFile = new File(localRegions);
  1618.         LOGGER.config("[" + getName() + "] Local regions file: "
  1619.                 + this.localRegionsFile);

  1620.         String enableSearch = config.getProperty(ENABLE_SEARCH_PROPERTY,
  1621.                 DEFAULT_ENABLE_SEARCH);
  1622.         if (Boolean.valueOf(enableSearch)) {
  1623.             searchSocket = new SearchServerSocket();
  1624.             searchSocket.setIndex(this);

  1625.             int searchPort = Integer.parseInt(config.getProperty(
  1626.                     SEARCH_PORT_PROPERTY, DEFAULT_SEARCH_PORT));
  1627.             searchSocket.setPort(searchPort);

  1628.             int searchThreads = Integer.parseInt(config.getProperty(
  1629.                     SEARCH_THREADS_PROPERTY, DEFAULT_SEARCH_THREADS));
  1630.             searchSocket.setThreads(searchThreads);

  1631.             LOGGER.config("[" + getName()
  1632.                     + "] SearchServerSocket running at localhost:" + searchPort
  1633.                     + ", with " + searchThreads + " threads");
  1634.         }
  1635.         // -- Load dependent configurations -- //

  1636.         associateUsingCurrentProducts = Boolean.valueOf(
  1637.                 config.getProperty(ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY,
  1638.                 DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS));
  1639.         LOGGER.config("[" + getName() + "] associateUsingCurrentProducts = "
  1640.                 + associateUsingCurrentProducts);
  1641.     }

  1642.     /**
  1643.      * Shuts down the Indexer. The parent shutdown method is called and then all
  1644.      * executor services (from listeners) are shutdown in sequence.
  1645.      */
  1646.     @Override
  1647.     public synchronized void shutdown() throws Exception {
  1648.         // -- Shut down dependent processes -- //
  1649.         try {
  1650.             if (readProductIndex != productIndex) {
  1651.                 readProductIndex.shutdown();
  1652.             }
  1653.         } catch (Exception e) {
  1654.             LOGGER.log(Level.WARNING, "[" + getName()
  1655.                     + "] exception shutting down read product index", e);
  1656.         }
  1657.         try {
  1658.             productIndex.shutdown();
  1659.         } catch (Exception e) {
  1660.             LOGGER.log(Level.WARNING, "[" + getName()
  1661.                     + "] exception shutting down product index", e);
  1662.         }
  1663.         productStorage.shutdown();

  1664.         // ExecutorServices tied to known listeners.
  1665.         Iterator<IndexerListener> iter = listeners.keySet().iterator();
  1666.         while (iter.hasNext()) {
  1667.             IndexerListener listener = iter.next();
  1668.             try {
  1669.                 listeners.get(listener).shutdown();
  1670.             } catch (Exception e) {
  1671.                 LOGGER.log(Level.WARNING, "[" + getName()
  1672.                         + "] exception shutting down listener executor", e);
  1673.             }
  1674.             if (listener instanceof Configurable) {
  1675.                 try {
  1676.                     ((Configurable) listener).shutdown();
  1677.                 } catch (Exception e) {
  1678.                     LOGGER.log(Level.WARNING, "[" + getName()
  1679.                             + "] exception shutting down listener", e);
  1680.                 }
  1681.             }
  1682.         }

  1683.         Iterator<IndexerModule> modules = this.modules.iterator();
  1684.         while (modules.hasNext()) {
  1685.             IndexerModule module = modules.next();
  1686.             if (module instanceof Configurable) {
  1687.                 try {
  1688.                     ((Configurable) module).shutdown();
  1689.                 } catch (Exception e) {
  1690.                     LOGGER.log(Level.WARNING, "[" + getName()
  1691.                             + "] exception shutting down module", e);
  1692.                 }
  1693.             }
  1694.         }
  1695.         // -- Shut down our own specific processes -- //

  1696.         // Shut down our timers if they exist
  1697.         if (archiveTask != null) {
  1698.             archiveTask.cancel();
  1699.             archiveTask = null;
  1700.         }
  1701.         if (archiveTimer != null) {
  1702.             archiveTimer.cancel();
  1703.             archiveTimer = null;
  1704.         }

  1705.         if (searchSocket != null) {
  1706.             searchSocket.shutdown();
  1707.         }
  1708.         // -- Call parent shutdown method -- //
  1709.         super.shutdown();
  1710.     }

  1711.     /**
  1712.      * Starts up the necessary parent, specific, and dependent processes, in
  1713.      * that order.
  1714.      */
  1715.     @Override
  1716.     public synchronized void startup() throws Exception {
  1717.         // -- Call parent startup method -- //
  1718.         super.startup();

  1719.         // -- Start up our own specific processes -- //

  1720.         backgroundService = Executors.newCachedThreadPool();

  1721.         // -- Start dependent processes -- //
  1722.         // ExecutorServices tied to known listeners.
  1723.         Iterator<IndexerListener> iter = listeners.keySet().iterator();
  1724.         while (iter.hasNext()) {
  1725.             IndexerListener listener = iter.next();
  1726.             if (listener instanceof Configurable) {
  1727.                 ((Configurable) listener).startup();
  1728.             }
  1729.         }

  1730.         // configure regions factory before modules
  1731.         ANSSRegionsFactory factory = ANSSRegionsFactory.getFactory(false);
  1732.         factory.setLocalRegions(localRegionsFile);
  1733.         factory.startup();

  1734.         Iterator<IndexerModule> modules = this.modules.iterator();
  1735.         while (modules.hasNext()) {
  1736.             IndexerModule module = modules.next();
  1737.             if (module instanceof Configurable) {
  1738.                 ((Configurable) module).startup();
  1739.             }
  1740.         }

  1741.         // ProductIndex
  1742.         productStorage.startup();
  1743.         productIndex.startup();

  1744.         // if using mysql product index, create separate read index
  1745.         readProductIndex = null;
  1746.         if (productIndex instanceof JDBCProductIndex) {
  1747.             JDBCProductIndex jdbcProductIndex = (JDBCProductIndex) productIndex;
  1748.             if (jdbcProductIndex.getDriver().contains("mysql")) {
  1749.                 readProductIndex = new JDBCProductIndex();
  1750.                 ((JDBCProductIndex) readProductIndex).setDriver(jdbcProductIndex.getDriver());
  1751.                 ((JDBCProductIndex) readProductIndex).setUrl(jdbcProductIndex.getUrl());
  1752.                 readProductIndex.startup();
  1753.             }
  1754.         }
  1755.         if (readProductIndex == null) {
  1756.             // otherwise use same index
  1757.             readProductIndex = productIndex;
  1758.         }

  1759.         // Cleanup thread to purge old products
  1760.         if (archivePolicies.size() > 0) {
  1761.             // Instantiate a timer object
  1762.             archiveTimer = new Timer();
  1763.             // Instantiate the task object
  1764.             archiveTask = new TimerTask() {
  1765.                 public void run() {
  1766.                     try {
  1767.                         int[] counts = purgeExpiredProducts();
  1768.                         LOGGER.info(String
  1769.                                 .format("["
  1770.                                         + getName()
  1771.                                         + "] purged %d expired events and %d expired unassociated products.",
  1772.                                         counts[0], counts[1]));
  1773.                     } catch (Exception ex) {
  1774.                         LOGGER.log(Level.WARNING, "[" + getName()
  1775.                                 + "] indexer cleanup thread threw exception",
  1776.                                 ex);
  1777.                     }
  1778.                 }
  1779.             };
  1780.             // Run archiver immediately at startup, then at regular intervals
  1781.             archiveTimer.schedule(archiveTask, 0L, archiveInterval);
  1782.         }

  1783.         if (searchSocket != null) {
  1784.             searchSocket.startup();
  1785.         }
  1786.     }

  1787.     /**
  1788.      * Checks the index for content that match a configured archive policy.
  1789.      * Events are checked first and matched events are removed along with all
  1790.      * their products. Listeners are notified of each archived event with an
  1791.      * EVENT_ARCHIVED type. Unassociated products are checked next, matched
  1792.      * unassociated products are archived and listeners are notified with
  1793.      * PRODUCT_ARCHIVE type.
  1794.      *
  1795.      * Note: Product "age" is determined by when the earthquake for that product
  1796.      * occurred and does not reflect how long the product has actually been in
  1797.      * the index.
  1798.      *
  1799.      * @see #archivePolicies
  1800.      * @return Int array of size 2
  1801.      * @throws Exception if error occurs
  1802.      */
  1803.     public synchronized int[] purgeExpiredProducts() throws Exception {
  1804.         int[] counts = { 0, 0 };
  1805.         ProductIndexQuery query = null;
  1806.         ArchivePolicy policy = null;

  1807.         if (isDisableArchive()) {
  1808.             LOGGER.info("Archiving disabled");
  1809.             return counts;
  1810.         }

  1811.         for (int i = 0; i < archivePolicies.size(); i++) {
  1812.             policy = archivePolicies.get(i);
  1813.             query = policy.getIndexQuery();

  1814.             if (!(policy instanceof ProductArchivePolicy)) {
  1815.                 // -- Purge expired events for this policy -- //
  1816.                 LOGGER.fine("[" + getName()
  1817.                         + "] running event archive policy (" + policy.getName()
  1818.                         + ")");
  1819.                 try {
  1820.                     // Get a list of those events
  1821.                     List<Event> expiredEvents = productIndex.getEvents(query);

  1822.                     // Loop over list of expired events and remove each one
  1823.                     Iterator<Event> eventIter = expiredEvents.iterator();
  1824.                     while (eventIter.hasNext()) {
  1825.                         Event event = eventIter.next();

  1826.                         LOGGER.info("[" + getName() + "] archiving event "
  1827.                                 + event.getEventId());
  1828.                         event.log(LOGGER);

  1829.                         productIndex.beginTransaction();
  1830.                         try {
  1831.                             removeEvent(event);

  1832.                             // Notify of the event archived
  1833.                             IndexerEvent notification = new IndexerEvent(this);
  1834.                             notification.setSummary(null);
  1835.                             notification.addIndexerChange(new IndexerChange(
  1836.                                     IndexerChange.EVENT_ARCHIVED, event, null));
  1837.                             notifyListeners(notification);

  1838.                             ++counts[0];
  1839.                             productIndex.commitTransaction();
  1840.                         } catch (Exception e) {
  1841.                             LOGGER.log(Level.WARNING, "[" + getName()
  1842.                                     + "] exception archiving event "
  1843.                                     + event.getEventId() + ", rolling back", e);
  1844.                             productIndex.rollbackTransaction();
  1845.                         }
  1846.                     }
  1847.                 } catch (Exception e) {
  1848.                     LOGGER.log(Level.WARNING, "[" + getName()
  1849.                             + "] exception running event archive policy ("
  1850.                             + policy.getName() + ") ", e);
  1851.                 }
  1852.             }

  1853.             if (policy instanceof ProductArchivePolicy) {
  1854.                 ProductArchivePolicy productPolicy = (ProductArchivePolicy) policy;

  1855.                 // -- Purge expired products for this policy -- //
  1856.                 LOGGER.fine("[" + getName()
  1857.                         + "] running product archive policy ("
  1858.                         + policy.getName() + ")");

  1859.                 try {
  1860.                     // Get a list of those products
  1861.                     List<ProductSummary> expiredProducts;

  1862.                     if (productPolicy.isOnlyUnassociated()) {
  1863.                         expiredProducts = productIndex
  1864.                                 .getUnassociatedProducts(query);
  1865.                     } else {
  1866.                         expiredProducts = productIndex.getProducts(query);
  1867.                     }

  1868.                     // Loop over list of expired products and remove each one
  1869.                     Iterator<ProductSummary> productIter = expiredProducts
  1870.                             .iterator();
  1871.                     while (productIter.hasNext()) {
  1872.                         ProductSummary product = productIter.next();

  1873.                         LOGGER.info("[" + getName() + "] archiving product "
  1874.                                 + product.getId().toString());
  1875.                         productIndex.beginTransaction();
  1876.                         try {
  1877.                             removeSummary(product);

  1878.                             // Notify of the product archived
  1879.                             IndexerEvent notification = new IndexerEvent(this);
  1880.                             notification.setSummary(product);
  1881.                             notification.addIndexerChange(new IndexerChange(
  1882.                                     IndexerChange.PRODUCT_ARCHIVED, null, null));
  1883.                             notifyListeners(notification);

  1884.                             ++counts[1];
  1885.                             productIndex.commitTransaction();
  1886.                         } catch (Exception e) {
  1887.                             LOGGER.log(Level.WARNING, "[" + getName()
  1888.                                     + "] exception archiving event "
  1889.                                     + product.getId().toString() + ", rolling back", e);
  1890.                             productIndex.rollbackTransaction();
  1891.                         }
  1892.                     }
  1893.                 } catch (Exception e) {
  1894.                     LOGGER.log(Level.WARNING, "[" + getName()
  1895.                             + "] exception running product archive policy ("
  1896.                             + policy.getName() + ")", e);
  1897.                 }

  1898.             }
  1899.         }

  1900.         return counts;
  1901.     }

  1902.     /**
  1903.      * Removes the given event from the Indexer ProductIndex and ProductStorage.
  1904.      *
  1905.      * @param event event to remove
  1906.      * @throws Exception
  1907.      *             If errors occur while removing the event
  1908.      */
  1909.     protected synchronized void removeEvent(Event event) throws Exception {
  1910.         // Removing an "event" from storage is really just removing all its
  1911.         // associated products
  1912.         List<ProductSummary> summaries = event.getAllProductList();
  1913.         Iterator<ProductSummary> summaryIter = summaries.iterator();
  1914.         while (summaryIter.hasNext()) {
  1915.             ProductSummary summary = summaryIter.next();
  1916.             // Remove product from storage
  1917.             productStorage.removeProduct(summary.getId());
  1918.             // Remove product summary from index
  1919.             productIndex.removeProductSummary(summary);
  1920.         }

  1921.         // Remove from index
  1922.         productIndex.removeEvent(event);
  1923.     }

  1924.     /**
  1925.      * Removes the given summary from the Indexer ProductIndex and
  1926.      * ProductStorage.
  1927.      *
  1928.      * @param summary to remove
  1929.      * @throws Exception
  1930.      *             If errors occur while removing the summary
  1931.      */
  1932.     protected synchronized void removeSummary(ProductSummary summary)
  1933.             throws Exception {

  1934.         Event event = getPrevEvent(summary);
  1935.         if (event != null) {
  1936.             List<ProductSummary> eventProducts = event.getAllProductList();
  1937.             if (eventProducts != null && eventProducts.size() == 1
  1938.                     && eventProducts.get(0).getId().equals(summary.getId())) {
  1939.                 // last product for the event
  1940.                 removeEvent(event);
  1941.                 // product is already removed by removeEvent
  1942.                 return;
  1943.             }
  1944.         }

  1945.         // Remove product from storage
  1946.         productStorage.removeProduct(summary.getId());
  1947.         // Remove product summary from index
  1948.         productIndex.removeProductSummary(summary);

  1949.         // if product was associated to event need to update index
  1950.         if (event != null) {
  1951.             // remove the product from the event
  1952.             event.removeProduct(summary);

  1953.             // update event table
  1954.             ArrayList<Event> events = new ArrayList<Event>();
  1955.             events.add(event);
  1956.             productIndex.eventsUpdated(events);
  1957.         }
  1958.     }

  1959.     /**
  1960.      * Tries to create an event based on information in the given summary. If
  1961.      * successful, the summary is associated to the newly created event. Note:
  1962.      * The summary must have been externally added to the ProductIndex before
  1963.      * this method can be called.
  1964.      *
  1965.      * A product summary must have non-null (id) source and code, (location)
  1966.      * latitude and longitude, and (time) time, in order to have the minimum
  1967.      * properties required to create a new event.
  1968.      *
  1969.      * @param summary
  1970.      *            The product summary serving as the basis for the new event.
  1971.      * @return The event that is created, added and associated or null if the
  1972.      *         given summary can not be used to create a new event.
  1973.      * @throws Exception
  1974.      *             If the ProductIndex.addEvent throws an exception or if the
  1975.      *             ProductIndex.addAssociation throws an exception. This may
  1976.      *             happen if this method is called before the summary is added
  1977.      *             to the ProductIndex.
  1978.      */
  1979.     private synchronized Event createEvent(ProductSummary summary)
  1980.             throws Exception {
  1981.         if (Event.productHasOriginProperties(summary)) {
  1982.             Event event = productIndex.addEvent(new Event());
  1983.             return productIndex.addAssociation(event, summary);
  1984.         } else {
  1985.             return null;
  1986.         }
  1987.     }

  1988.     /**
  1989.      * Search for products in this index.
  1990.      *
  1991.      * @param request
  1992.      *            the search request.
  1993.      * @return the search response.
  1994.      * @throws Exception if error occurs
  1995.      */
  1996.     public synchronized SearchResponse search(SearchRequest request)
  1997.             throws Exception {
  1998.         SearchResponse response = new SearchResponse();

  1999.         // Execute each query
  2000.         Iterator<SearchQuery> iter = request.getQueries().iterator();
  2001.         while (iter.hasNext()) {
  2002.             SearchQuery query = iter.next();

  2003.             if (query instanceof EventsSummaryQuery) {
  2004.                 List<EventSummary> eventSummaries = new LinkedList<EventSummary>();
  2005.                 Iterator<Event> events = productIndex.getEvents(
  2006.                         query.getProductIndexQuery()).iterator();
  2007.                 // convert events to event summaries
  2008.                 while (events.hasNext()) {
  2009.                     Event event = events.next();
  2010.                     eventSummaries.add(event.getEventSummary());
  2011.                 }
  2012.                 ((EventsSummaryQuery) query).setResult(eventSummaries);
  2013.             }

  2014.             else if (query instanceof EventDetailQuery) {
  2015.                 List<Event> events = productIndex.getEvents(query
  2016.                         .getProductIndexQuery());
  2017.                 ((EventDetailQuery) query).setResult(events);
  2018.             }

  2019.             else if (query instanceof ProductsSummaryQuery) {
  2020.                 List<ProductSummary> products = productIndex.getProducts(query
  2021.                         .getProductIndexQuery());
  2022.                 ((ProductsSummaryQuery) query).setResult(products);
  2023.             }

  2024.             else if (query instanceof ProductDetailQuery) {
  2025.                 List<Product> products = new LinkedList<Product>();
  2026.                 Iterator<ProductId> ids = query.getProductIndexQuery()
  2027.                         .getProductIds().iterator();
  2028.                 // fetch products from storage
  2029.                 while (ids.hasNext()) {
  2030.                     ProductId id = ids.next();
  2031.                     Product product = productStorage.getProduct(id);
  2032.                     if (product != null) {
  2033.                         products.add(product);
  2034.                     }
  2035.                 }
  2036.                 ((ProductDetailQuery) query).setResult(products);
  2037.             }

  2038.             response.addResult(query);
  2039.         }

  2040.         return response;
  2041.     }

  2042.     /** @return disableArchive */
  2043.     public boolean isDisableArchive() {
  2044.         return disableArchive;
  2045.     }

  2046.     /** @param disableArchive boolean to set */
  2047.     public void setDisableArchive(boolean disableArchive) {
  2048.         this.disableArchive = disableArchive;
  2049.     }

  2050.     /**
  2051.      * @return the archiveInterval
  2052.      */
  2053.     public long getArchiveInterval() {
  2054.         return archiveInterval;
  2055.     }

  2056.     /**
  2057.      * @param archiveInterval
  2058.      *            the archiveInterval to set
  2059.      */
  2060.     public void setArchiveInterval(long archiveInterval) {
  2061.         this.archiveInterval = archiveInterval;
  2062.     }

  2063.     /**
  2064.      * @return the archivePolicies
  2065.      */
  2066.     public List<ArchivePolicy> getArchivePolicies() {
  2067.         return archivePolicies;
  2068.     }

  2069. }