Indexer.java
- /*
- * Indexer
- */
- package gov.usgs.earthquake.indexer;
- import gov.usgs.earthquake.distribution.ConfigurationException;
- import gov.usgs.earthquake.distribution.DefaultNotificationListener;
- import gov.usgs.earthquake.distribution.FileProductStorage;
- import gov.usgs.earthquake.distribution.HeartbeatListener;
- import gov.usgs.earthquake.distribution.Notification;
- import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
- import gov.usgs.earthquake.distribution.ProductStorage;
- import gov.usgs.earthquake.geoserve.ANSSRegionsFactory;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.earthquake.util.CompareUtil;
- import gov.usgs.util.Config;
- import gov.usgs.util.Configurable;
- import gov.usgs.util.FutureExecutorTask;
- import gov.usgs.util.StringUtils;
- import java.io.File;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * The indexer receives products from Distribution, and adds them to the
- * EventIndex.
- *
- * This class provides the following configurable properties (in addition to
- * those inherited from DefaultNotificationListener):
- * <dl>
- * <dt>associator</dt>
- * <dd>An object that implements the Associator interface.</dd>
- *
- * <dt>storage</dt>
- * <dd>An object that implements the ProductStorage interface.</dd>
- *
- * <dt>index</dt>
- * <dd>An object that implements the ProductIndex interface.</dd>
- *
- * <dt>modules</dt>
- * <dd>A comma delimited list of objects that implement the IndexerModule
- * interface</dd>
- *
- * <dt>listeners</dt>
- * <dd>A comma delimited list of objects that implement the IndexerListener
- * interface</dd>
- * </dl>
- */
- public class Indexer extends DefaultNotificationListener {
- /** Logging Utility **/
- private static final Logger LOGGER = Logger.getLogger(Indexer.class
- .getName());
- /** Preferred weight for persistent trump. */
- public static final long TRUMP_PREFERRED_WEIGHT = 100000000;
- /** Type for persistent trimp */
- public static final String TRUMP_PRODUCT_TYPE = "trump";
- /** Prefix for persistent trump */
- public static final String PERSISTENT_TRUMP_PREFIX = "trump-";
- /** Property name to configure a custom associator. */
- public static final String ASSOCIATOR_CONFIG_PROPERTY = "associator";
- /** Property to associate using current products */
- public static final String ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY = "associateUsingCurrentProducts";
- /** Default state for associate using current products */
- public static final String DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS = "false";
- /** Property name to configure a custom storage. */
- public static final String STORAGE_CONFIG_PROPERTY = "storage";
- /** Shortcut name to configure a file product storage. */
- public static final String STORAGE_DIRECTORY_CONFIG_PROPERTY = "storageDirectory";
- /** Property name to configure a custom index. */
- public static final String INDEX_CONFIG_PROPERTY = "index";
- /** Shortcut name to configure a sqlite index. */
- public static final String INDEXFILE_CONFIG_PROPERTY = "indexFile";
- /** Property name to configure modules. */
- public static final String MODULES_CONFIG_PROPERTY = "modules";
- /** Property name to configure listeners. */
- public static final String LISTENERS_CONFIG_PROPERTY = "listeners";
- /** Property name to configure local regions file. */
- public static final String LOCAL_REGIONS_PROPERTY = "localRegionsFile";
- /** Path to local regions file. */
- public static final String DEFAULT_LOCAL_REGIONS = "regions.json";
- /** Property name to enable search socket. */
- public static final String ENABLE_SEARCH_PROPERTY = "enableSearch";
- /** Property name for search socket port. */
- public static final String SEARCH_PORT_PROPERTY = "searchPort";
- /** Property name for search socket thread pool size. */
- public static final String SEARCH_THREADS_PROPERTY = "searchThreads";
- /** Default value whether to enable search socket. */
- public static final String DEFAULT_ENABLE_SEARCH = "false";
- /** Default port where search socket listens. */
- public static final String DEFAULT_SEARCH_PORT = "11236";
- /** Number of threads (concurrent searches) allowed. */
- public static final String DEFAULT_SEARCH_THREADS = "5";
- /** Utility used for associating products to events. */
- private Associator associator;
- /** Whether to use (false) all products or (true) current products. */
- private boolean associateUsingCurrentProducts = false;
- /** Where product contents are stored. */
- private ProductStorage productStorage;
- /** Index of stored products, and how they are related. */
- private ProductIndex productIndex;
- /** Read index for {@link #hasProductBeenIndexed(ProductId)} */
- private ProductIndex readProductIndex;
- /** Modules provide product specific functionality. */
- private List<IndexerModule> modules = new LinkedList<IndexerModule>();
- /** Listeners listen for changes to the event index. */
- private Map<IndexerListener, ExecutorService> listeners = new HashMap<IndexerListener, ExecutorService>();
- /** Local file where regions are stored. */
- private File localRegionsFile = new File(DEFAULT_LOCAL_REGIONS);
- /** Timer for archive policy thread. */
- private Timer archiveTimer = null;
- /** Task for archive policy thread. */
- private TimerTask archiveTask = null;
- /** Synchronization object for indexing. */
- private final Object indexProductSync = new Object();
- /**
- * Service used by FutureExecutorTask for execution.
- * See distribution.FutureListenerNotifier for more details.
- */
- private ExecutorService backgroundService;
- /** Whether to (false) or not (true) to run archive policies. */
- private boolean disableArchive = false;
- // -- Configurable property names -- //
- /** Configurable property for index archive internal */
- public static final String INDEX_ARCHIVE_INTERVAL_PROPERTY = "archiveInterval";
- /** Configurable property for index archive policy */
- public static final String INDEX_ARCHIVE_POLICY_PROPERTY = "archivePolicy";
- // -- Default configurable property values -- //
- private static final long INDEX_ARCHIVE_INTERVAL_DEFAULT = 300000L;
- // -- Configured member variables. Values set in configure() method. -- //
- private long archiveInterval = 0;
- private List<ArchivePolicy> archivePolicies = null;
- private SearchServerSocket searchSocket = null;
- private DefaultIndexerModule defaultModule = new DefaultIndexerModule();
- /**
- * Default no-arg constructor. This gets called from the Configurable API.
- * All configuration parameters are set in the "configure" method.
- *
- * @throws Exception
- * If the JDBCProductIndex throws an exception.
- */
- public Indexer() throws Exception {
- addModule(defaultModule);
- associator = new DefaultAssociator();
- productStorage = new FileProductStorage();
- productIndex = new JDBCProductIndex();
- archivePolicies = new LinkedList<ArchivePolicy>();
- }
- /**
- * Returns the current associator used to associate products to one-another
- * and products to events.
- *
- * @return The current Associator.
- */
- public Associator getAssociator() {
- return associator;
- }
- /**
- * Sets the given associator as the current associator to associate products
- * to one-another and products to events.
- *
- * @param associator
- * The associator to use from this point forward.
- */
- public void setAssociator(Associator associator) {
- this.associator = associator;
- }
- /**
- * Returns the product storage component that is used to store products as
- * they are received.
- *
- * @return The current product storage component.
- */
- public ProductStorage getProductStorage() {
- return productStorage;
- }
- /**
- * Sets the current product storage component used to store products as they
- * are received.
- *
- * @param productStorage
- * The product storage component to use from this point forward.
- */
- public void setProductStorage(ProductStorage productStorage) {
- this.productStorage = productStorage;
- }
- /**
- * Returns the product index component used to index product information as
- * it is received.
- *
- * @return The current product index component.
- */
- public ProductIndex getProductIndex() {
- return productIndex;
- }
- /**
- * Sets the product index component used to index product information as it
- * is received.
- *
- * @param productIndex
- * The product index component to use from this point forward.
- */
- public void setProductIndex(ProductIndex productIndex) {
- this.productIndex = productIndex;
- }
- /**
- * Adds the give indexer module to the current list of modules used by the
- * indexer to handle products.
- *
- * @param toAdd
- * The IndexerModule to add to our list.
- */
- public void addModule(final IndexerModule toAdd) {
- modules.add(toAdd);
- }
- /**
- * Removes the first occurrence of the given indexer module from the current
- * list of known modules.
- *
- * @param toRemove
- * The module to remove.
- * @see java.util.LinkedList#remove(Object)
- */
- public void removeModule(final IndexerModule toRemove) {
- modules.remove(toRemove);
- }
- /**
- * This method checks each module's support level for the given product,
- * returning the first module with the highest support level.
- *
- * @param product
- * the product to summarize.
- * @return module best suited to summarize product.
- */
- protected IndexerModule getModule(final Product product) {
- // mit is the module fetched off the iterator
- // m is the module to return
- IndexerModule mit = null, m = null;
- Iterator<IndexerModule> it = modules.iterator();
- // If there are no known modules, then null. Oops. :)
- if (it.hasNext()) {
- // Use first module so long as it exists
- m = it.next();
- // Check remaining modules if any offer greater support
- while (it.hasNext()) {
- mit = it.next();
- // We use strictly greater than (no equals)
- if (mit.getSupportLevel(product) > m.getSupportLevel(product)) {
- m = mit;
- }
- }
- }
- return m;
- }
- /**
- * Adds a listener to this indexer. Listeners are notified when an event is
- * added, updated, or deleted, or when a new product arrives and is
- * un-associated to an event.
- *
- * @param toAdd
- * The IndexerListener to add
- */
- public void addListener(final IndexerListener toAdd) {
- if (!listeners.containsKey(toAdd)) {
- ExecutorService listenerExecutor = Executors
- .newSingleThreadExecutor();
- listeners.put(toAdd, listenerExecutor);
- }
- }
- /**
- * Removes a listener from this indexer.Listeners are notified when an event
- * is added, updated, or deleted, or when a new product arrives and is
- * un-associated to an event.
- *
- * @param toRemove
- * The IndexerListener to remove
- */
- public void removeListener(final IndexerListener toRemove) {
- // Remove listener from map
- ExecutorService listenerExecutor = listeners.remove(toRemove);
- if (listenerExecutor != null) {
- // Shutdown executor thread
- listenerExecutor.shutdown();
- }
- backgroundService.shutdown();
- backgroundService = null;
- }
- /**
- * Send an indexer event to all registered IndexerListeners.
- *
- * Creates a NotificationEvent, with a reference to this object and calls
- * each notificationListeners onNotification method in separate threads.
- *
- * This method usually returns before registered NotificationListeners have
- * completed processing a notification.
- *
- * @param event
- * The event that occurred to trigger the notification. Note: An
- * IndexerEvent has a specific "type" to clarify the type of
- * event that occurred.
- */
- protected synchronized void notifyListeners(final IndexerEvent event) {
- StringBuffer buf = new StringBuffer();
- Iterator<IndexerChange> changes = event.getIndexerChanges().iterator();
- while (changes.hasNext()) {
- IndexerChange change = changes.next();
- buf.append("\n").append(change.getType().toString()).append(" ");
- if (change.getOriginalEvent() == null) {
- buf.append("null");
- } else {
- buf.append(change.getOriginalEvent().getEventId());
- }
- buf.append(" => ");
- if (change.getNewEvent() == null) {
- buf.append("null");
- } else {
- buf.append(change.getNewEvent().getEventId());
- }
- }
- // Can't rely on event.getSummary because that might be null
- ProductSummary theSummary = event.getSummary();
- if (theSummary == null && event.getEvents().size() > 0) {
- theSummary = event.getEvents().get(0).getEventIdProduct();
- }
- if (theSummary != null) {
- LOGGER.log(Level.INFO, "[" + getName() + "] indexed product id="
- + theSummary.getId().toString()
- + ", status=" + theSummary.getStatus()
- + buf.toString());
- } else {
- LOGGER.log(Level.FINE, "[" + getName()
- + "] event summary was null. This probably "
- + "means the archive policy is notifying of an archived "
- + "event.");
- }
- Iterator<IndexerListener> it = listeners.keySet().iterator();
- while (it.hasNext()) {
- final IndexerListener listener = it.next();
- ExecutorService listenerExecutor = listeners.get(listener);
- FutureExecutorTask<Void> listenerTask = new FutureExecutorTask<Void>(
- backgroundService, listenerExecutor, listener.getMaxTries(),
- listener.getTimeout(), new IndexerListenerCallable(listener,
- event));
- listenerExecutor.submit(listenerTask);
- }
- }
- /**
- * Check whether this product is in the index.
- *
- * NOT synchronized to allow multiple threads to access.
- * readProductIndex.hasProduct is synchronized.
- *
- * @param id ProductId to check
- * @return true if product has already been indexed.
- */
- protected boolean hasProductBeenIndexed(final ProductId id) {
- try {
- if (readProductIndex == productIndex) {
- // synchronize on this if read and product index are same
- synchronized (indexProductSync) {
- readProductIndex.beginTransaction();
- try {
- boolean hasProduct = readProductIndex.hasProduct(id);
- readProductIndex.commitTransaction();
- return hasProduct;
- } catch (Exception e) {
- readProductIndex.rollbackTransaction();
- }
- }
- } else {
- // otherwise synchronize on readProductIndex
- // transaction reconnects if needed
- synchronized (readProductIndex) {
- readProductIndex.beginTransaction();
- try {
- boolean hasProduct = readProductIndex.hasProduct(id);
- readProductIndex.commitTransaction();
- return hasProduct;
- } catch (Exception e) {
- readProductIndex.rollbackTransaction();
- }
- }
- }
- } catch (Exception wtf) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception checking if product already indexed", wtf);
- }
- // default is it hasn't been processed
- return false;
- }
- /**
- * Override the DefaultNotificationListener accept method,
- * to always process products that may affect event association.
- *
- * @param id
- * the product id to check.
- * @return boolean
- * whether the product should be indexed.
- */
- @Override
- public boolean accept(final ProductId id) {
- final boolean superAccept = super.accept(id);
- if (!superAccept && isIncludeActuals()) {
- // automatically accept products that affect association
- // (if processing non-scenario products)
- final String type = id.getType();
- if (Event.ORIGIN_PRODUCT_TYPE.equals(type) ||
- Event.ASSOCIATE_PRODUCT_TYPE.equals(type) ||
- Event.DISASSOCIATE_PRODUCT_TYPE.equals(type)
- || type.startsWith(TRUMP_PRODUCT_TYPE)) {
- return true;
- }
- }
- return superAccept;
- }
- /**
- * Check whether to skip products that have already been indexed.
- */
- @Override
- protected boolean onBeforeProcessNotification(Notification notification) throws Exception {
- // try to short-circuit duplicates
- if (!isProcessDuplicates() && hasProductBeenIndexed(notification.getProductId())) {
- LOGGER.finer(
- "[" + getName() + "] notification already indexed, skipping "
- + notification.getProductId().toString());
- return false;
- }
- // otherwise, use default behavior
- return super.onBeforeProcessNotification(notification);
- }
- /**
- * This method receives a product from Product Distribution and adds it to
- * the index.
- *
- * Implementation follows from Product Indexer Diagram (pg.10) of
- * ProductIndexer.pdf document dated 09/09/2010.
- *
- * Calls onProduct(product, false), which will not reprocess already
- * processed products.
- *
- * @param product
- * The product triggering the event.
- * @throws Exception
- * if an exception occurs.
- */
- @Override
- public void onProduct(final Product product) throws Exception {
- onProduct(product, false);
- }
- /**
- * Receive a product and add it to the index. Optionally, reprocessing a
- * product that has already been processed.
- *
- * @param product
- * The product triggering the event.
- * @param force
- * Whether to reprocess products that have already been processed
- * (true), or skip (false).
- * @throws Exception if error occurs
- */
- public void onProduct(final Product product, final boolean force) throws Exception {
- ProductId id = product.getId();
- final long beginStore = new Date().getTime();
- // -------------------------------------------------------------------//
- // -- Step 1: Store product
- // -------------------------------------------------------------------//
- final Product storedProduct = storeProduct(product, force);
- if (storedProduct == null) {
- return;
- }
- // -------------------------------------------------------------------//
- // -- Step 2: Use product module to summarize product
- // -------------------------------------------------------------------//
- LOGGER.finer("[" + getName() + "] summarizing product id=" + id.toString());
- final ProductSummary productSummary = summarizeProduct(product);
- // -------------------------------------------------------------------//
- // -- Step 3: Add product summary to the product index
- // -------------------------------------------------------------------//
- LOGGER.finer("[" + getName() + "] indexing product id=" + id.toString());
- // measure time waiting to enter synchronized block
- final long beforeEnterSync = new Date().getTime();
- synchronized (indexProductSync) {
- final long afterEnterSync = new Date().getTime();
- try {
- indexProduct(productSummary);
- } finally {
- final long endIndex = new Date().getTime();
- LOGGER.fine("[" + getName() + "] indexer processed product id="
- + id.toString() + " in " +
- (endIndex - beginStore) + " ms"
- + " (" + (afterEnterSync - beforeEnterSync) + " ms sync delay)");
- }
- }
- }
- /**
- * Stores a product
- * @param product Product to store
- * @param force if should skip already indexed check
- * @return Product if stored, null if not
- * @throws Exception if error occurs
- */
- public Product storeProduct(final Product product, final boolean force) throws Exception {
- final ProductId id = product.getId();
- final long beginStore = new Date().getTime();
- try {
- LOGGER.finest("[" + getName() + "] storing product id="
- + id.toString());
- productStorage.storeProduct(product);
- LOGGER.finest("[" + getName() + "] stored product id="
- + id.toString());
- } catch (ProductAlreadyInStorageException paise) {
- LOGGER.finer("["
- + getName()
- + "] product already in indexer storage, checking if indexed");
- if (force) {
- LOGGER.finer("[" + getName()
- + "] force=true skipping check, (re)process product");
- } else if (hasProductBeenIndexed(id)) {
- LOGGER.fine("[" + getName() + "] product already indexed "
- + product.getId());
- // don't reindex for now
- return null;
- }
- }
- final long endStore = new Date().getTime();
- LOGGER.fine("[" + getName() + "] indexer downloaded product id="
- + id.toString() + " in " +
- (endStore - beginStore) + " ms");
- return product;
- }
- /**
- * Use modules to summarize product.
- * @param product To summarize
- * @return A product summary
- * @throws Exception if error occurs
- */
- public ProductSummary summarizeProduct(final Product product) throws Exception {
- // Find best available indexer module
- IndexerModule module = getModule(product);
- return module.getProductSummary(product);
- }
- /**
- * Add product summary to product index.
- * @param productSummary to add
- * @return Summary added to index
- * @throws Exception if error occurs
- */
- protected synchronized ProductSummary indexProduct(
- ProductSummary productSummary) throws Exception {
- LOGGER.finest("[" + getName() + "] beginning index transaction");
- // The notification to be sent when we are finished with this product
- IndexerEvent notification = new IndexerEvent(this);
- notification.setIndex(getProductIndex());
- notification.setSummary(productSummary);
- // Start the product index transaction, only proceed if able
- productIndex.beginTransaction();
- try {
- LOGGER.finer("[" + getName() + "] finding previous version");
- // Check index for previous version of this product
- ProductSummary prevSummary = getPrevProductVersion(productSummary);
- LOGGER.finer("[" + getName() + "] finding previous event");
- Event prevEvent = null;
- boolean redundantProduct = isRedundantProduct(prevSummary, productSummary);
- if (!redundantProduct) {
- // Skip association queries and use existing product association
- // performed in next branch (should be associated already if
- // "redundant").
- // Check index for existing event candidate
- prevEvent = getPrevEvent(productSummary, true);
- }
- // may be an update/delete to a product that previously associated
- // to an event, even though this product isn't associating on its
- // own
- if (prevSummary != null && prevEvent == null) {
- // see if prevSummary associated with an event
- ProductIndexQuery prevEventQuery = new ProductIndexQuery();
- prevEventQuery.getProductIds().add(prevSummary.getId());
- if (associateUsingCurrentProducts) {
- prevEventQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
- }
- List<Event> prevEvents = productIndex.getEvents(prevEventQuery);
- if (prevEvents.size() != 0) {
- // just use first (there can really only be one).
- prevEvent = prevEvents.get(0);
- }
- }
- // special handling to allow trump products to associate based on
- // a product link. Not used when eventsource/eventsourcecode set.
- if (prevEvent == null
- && productSummary.getId().getType().equals(TRUMP_PRODUCT_TYPE)
- && productSummary.getLinks().containsKey("product")
- && !productSummary.getStatus().equalsIgnoreCase(
- Product.STATUS_DELETE)) {
- // see if we can associate via another product
- ProductIndexQuery otherEventQuery = new ProductIndexQuery();
- otherEventQuery.getProductIds().add(
- ProductId.parse(productSummary.getLinks()
- .get("product").get(0).toString()));
- if (associateUsingCurrentProducts) {
- otherEventQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
- }
- List<Event> prevEvents = productIndex
- .getEvents(otherEventQuery);
- if (prevEvents.size() != 0) {
- // just use first (there can really only be one).
- prevEvent = prevEvents.get(0);
- }
- }
- // Add the summary to the index
- LOGGER.finer("[" + getName() + "] adding summary to index");
- if (prevSummary != null && prevSummary.equals(productSummary)) {
- // implied force=true, prevEvent!=null
- // remove the previous version of this product summary
- // so the new one can take its place
- if (prevEvent != null) {
- productIndex.removeAssociation(prevEvent, prevSummary);
- } else {
- LOGGER.fine("[" + getName()
- + "] reprocessing unassociated summary");
- }
- productIndex.removeProductSummary(prevSummary);
- }
- productSummary = productIndex.addProductSummary(productSummary);
- Event event = null;
- if (prevEvent == null) {
- // No existing event, try to create one and associate
- event = createEvent(productSummary);
- if (event != null) {
- LOGGER.finer("[" + getName() + "] created event indexid="
- + event.getIndexId());
- event.log(LOGGER);
- } else {
- LOGGER.finer("[" + getName()
- + "] unable to create event for product.");
- }
- } else {
- LOGGER.finer("[" + getName()
- + "] found existing event indexid="
- + prevEvent.getIndexId());
- prevEvent.log(LOGGER);
- // Existing event found associate to it
- event = productIndex.addAssociation(prevEvent, productSummary);
- }
- // Can't split or merge a non-existent event
- if (prevEvent != null && event != null) {
- LOGGER.finer("[" + getName() + "] checking for event splits");
- // Check for event splits
- notification.addIndexerChanges(checkForEventSplits(
- productSummary, prevEvent, event));
- }
- // Is this a problem??? split may modify the event, and then
- // the unmodified version of that event is passed to merge???
- // If this is a problem, checkForEventSplits and checkForEventMerges
- // could be modified to accept the notification object (and add
- // changes to it) and return the potentially modified object by
- // reference.
- if (event != null) {
- LOGGER.finer("[" + getName() + "] checking for event merges");
- // Check for event merges
- notification.addIndexerChanges(checkForEventMerges(
- productSummary, prevEvent, event));
- }
- // see if this is a trump product that needs special processing.
- event = checkForTrump(event, productSummary, prevSummary);
- // Set our notification indexer changes if not set yet
- if (notification.getIndexerChanges().size() == 0) {
- if (prevEvent == null && event != null) {
- // No previous event, so event added.
- notification.addIndexerChange(new IndexerChange(
- IndexerChange.EVENT_ADDED, prevEvent, event));
- } else if (prevEvent != null && event != null) {
- // Previous existed so event updated.
- notification.addIndexerChange(new IndexerChange(event
- .isDeleted() ? IndexerChange.EVENT_DELETED
- : IndexerChange.EVENT_UPDATED, prevEvent, event));
- } else if (prevEvent == null && event == null) {
- // No event existed or could be created.
- if (prevSummary == null) {
- // No previous summary, product added.
- notification.addIndexerChange(new IndexerChange(
- IndexerChange.PRODUCT_ADDED, null, null));
- } else {
- // Previous summary existed. Product updated.
- notification
- .addIndexerChange(new IndexerChange(
- productSummary.isDeleted() ? IndexerChange.PRODUCT_DELETED
- : IndexerChange.PRODUCT_UPDATED,
- null, null));
- }
- }
- }
- LOGGER.finer("[" + getName()
- + "] updating event summary parameters");
- // update preferred event parameters in index
- productIndex.eventsUpdated(notification.getEvents());
- LOGGER.finer("[" + getName() + "] committing transaction");
- // Commit our changes to the index (after updating summary attrs)
- productIndex.commitTransaction();
- } catch (Exception e) {
- LOGGER.log(Level.FINE, "[" + getName() + "] rolling back transaction", e);
- // just rollback since it wasn't successful
- productIndex.rollbackTransaction();
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(),
- "index exception", productSummary.getId().toString());
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(),
- "index exception class", e.getClass().getName());
- throw e;
- }
- try {
- LOGGER.fine("[" + getName() + "] notifying listeners");
- // ---------------------------------------------------------//
- // -- Step 5: Notify listeners with our indexer event
- // ---------------------------------------------------------//
- notifyListeners(notification);
- } catch (Exception e) {
- // this doesn't affect success of index transaction...
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception while notifying listeners", e);
- }
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(),
- "indexed product", productSummary.getId().toString());
- // return summary after added to index
- return productSummary;
- }
- /**
- * Check whether two products are redundant, meaning would not affect event
- * associations and indexer can skip split/merge steps.
- *
- * @param previous previous version of product.
- * @param current current version of product.
- * @return true if products are equivalent for association purposes.
- */
- private boolean isRedundantProduct(final ProductSummary previous, final ProductSummary current) {
- if (previous == null || previous.equals(current)
- || !previous.getId().isSameProduct(current.getId())) {
- return false;
- }
- if (previous.getPreferredWeight() == current.getPreferredWeight()
- && CompareUtil.nullSafeCompare(previous.getStatus(),
- current.getStatus()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventDepth(),
- current.getEventDepth()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventLatitude(),
- current.getEventLatitude()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventLongitude(),
- current.getEventLongitude()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventMagnitude(),
- current.getEventMagnitude()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventSource(),
- current.getEventSource()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventSourceCode(),
- current.getEventSourceCode()) == 0
- && CompareUtil.nullSafeCompare(previous.getEventTime(),
- current.getEventTime()) == 0) {
- // these are the properties that would influence indexer associations
- // or preferred event properties.
- return true;
- }
- return false;
- }
- /**
- * Check for, and handle incoming trump products.
- *
- * Handles version specific trump products, calls
- * {@link #checkForPersistentTrump(Event, ProductSummary, ProductSummary)}
- * to handle "persistent" trump products.
- *
- * VERSION SPECIFIC TRUMP
- *
- * Version specific trump products include:
- * - a link with relation "product" that is a product id urn.
- * - a property "weight" that defines the new preferred weight.
- *
- * Finds the associated product, resummarizes. If trump is deleted, product
- * is associated as is. If trump is not deleted, set's preferred weight
- * before reassociating.
- *
- * Preconditions:
- * <ul>
- * <li>The "trump" type product must associate with the correct event on its
- * own. The admin pages accomplish this by sending
- * eventsource/eventsourcecode of the associated product. This means that no
- * product without an eventsource/eventsourcecode property may be trumped.</li>
- * </ul>
- *
- * @param event
- * current event being updated.
- * @param productSummary
- * product summary associated with event.
- * @return updated event, or same event if not updated.
- * @throws Exception
- * @see {@link #checkForPersistentTrump(Event, ProductSummary, ProductSummary)}
- */
- private Event checkForTrump(Event event, ProductSummary productSummary,
- ProductSummary prevSummary) throws Exception {
- if (event == null) {
- return event;
- }
- String type = productSummary.getId().getType();
- if (type.equals(TRUMP_PRODUCT_TYPE)) {
- // version specific trump
- ProductId trumpedId = null;
- ProductSummary trumpedSummary = null;
- if (productSummary.isDeleted()) {
- // deleting version specific trump
- // reset preferred weight of reference product
- // (is is a link in previous version of trump)
- trumpedId = getTrumpedProductId(prevSummary);
- if (trumpedId == null) {
- LOGGER.warning("Unable to process trump delete, "
- + "missing 'product' link from previous version");
- return event;
- }
- trumpedSummary = getProductSummaryById(trumpedId);
- if (trumpedSummary == null) {
- // no matching product in index, possibly already deleted
- return event;
- }
- // resummarize product
- event = resummarizeProduct(event, trumpedSummary);
- } else {
- // updating product weight
- trumpedId = getTrumpedProductId(productSummary);
- Long weight = Long.valueOf(productSummary.getProperties().get(
- "weight"));
- if (trumpedId == null || weight == null) {
- LOGGER.warning("Invalid trump, "
- + "missing 'product' link or 'weight' property");
- return event;
- }
- trumpedSummary = getProductSummaryById(trumpedId);
- if (trumpedSummary == null) {
- // no matching product in index, possibly already deleted
- LOGGER.info("Unable to process trump, " + "product '"
- + trumpedId.toString() + "' not found");
- return event;
- }
- event = setSummaryWeight(event, trumpedSummary, weight);
- }
- } else {
- return checkForPersistentTrump(event, productSummary);
- }
- return event;
- }
- /**
- * Check for, and handle persistent trump products.
- *
- * PERSISTENT TRUMP
- *
- * Persistent trump products include:
- * - a type "trump-PRODUCT" where PRODUCT is the type of product receiving
- * trump.
- * - a property "trump-source" that is the source of product receiving trump.
- * - a property "trump-code" that is the code of product receiving trump.
- *
- * Steps:
- *
- * 1) Find preferred persistent trump product for product being associated
- * (may be a persistent trump product being associated)
- * 2) If a non-trump product being associated,
- * stop processing if not affected by trump.
- * 3) set TRUMP_PREFERRED_WEIGHT on product referenced by preferred
- * persistent trump product; resummarize any other product that has
- * TRUMP_PREFERRED_WEIGHT.
- *
- * Preconditions:
- * <ul>
- * <li>The "trump" type product must associate with the correct event on its
- * own. The admin pages accomplish this by sending
- * eventsource/eventsourcecode of the associated product.</li>
- * </ul>
- *
- * @param event
- * current event being updated.
- * @param productSummary
- * product summary associated with event.
- * @return updated event, or same event if not updated.
- * @throws Exception
- */
- private Event checkForPersistentTrump(final Event event,
- final ProductSummary productSummary) throws Exception {
- Event updatedEvent = event;
- // the type of product currently being indexed
- String type = productSummary.getType();
- // the "trump-TYPE" product type
- String persistentTrumpType = null;
- // whether productSummary is a persistent trump product
- boolean associatingTrump = false;
- // the product source receiving trump
- String trumpSource = null;
- // the product type receiving trump
- String trumpType = null;
- // the product code receiving trump
- String trumpCode = null;
- // determine persistentTrumpType and trumpType
- if (type.startsWith(PERSISTENT_TRUMP_PREFIX)) {
- // incoming trump product
- persistentTrumpType = type;
- trumpType = type.replace(PERSISTENT_TRUMP_PREFIX, "");
- associatingTrump = true;
- // always set persistent trump preferred weight to 1,
- // so most recent updateTime is most preferred
- updatedEvent = setSummaryWeight(updatedEvent, productSummary, 1L);
- } else {
- // incoming product, possibly affected by existing trump
- persistentTrumpType = PERSISTENT_TRUMP_PREFIX + type;
- trumpType = type;
- }
- // find active persistent trump product for type
- ProductSummary persistentTrump = updatedEvent.getPreferredProduct(
- persistentTrumpType);
- if (persistentTrump != null) {
- trumpSource = persistentTrump.getProperties().get("trump-source");
- trumpCode = persistentTrump.getProperties().get("trump-code");
- }
- // if a non-trump product is coming in,
- // only continue processing if it is affected by persistentTrump.
- // (otherwise weights should already be set)
- if (!associatingTrump &&
- !(productSummary.getSource().equals(trumpSource)
- && productSummary.getCode().equals(trumpCode))) {
- // not affected by trump
- return event;
- }
- // update products affected by trump
- List<ProductSummary> products = updatedEvent.getProducts(trumpType);
- if (products != null) {
- for (ProductSummary summary : products) {
- if (summary.getSource().equals(trumpSource)
- && summary.getCode().equals(trumpCode)) {
- // add trump to product
- updatedEvent = setSummaryWeight(updatedEvent, summary,
- TRUMP_PREFERRED_WEIGHT);
- } else if (summary.getPreferredWeight() == TRUMP_PREFERRED_WEIGHT) {
- // remove trump from previously trumped product.
- updatedEvent = resummarizeProduct(updatedEvent, summary);
- }
- }
- }
- // return updated event
- return updatedEvent;
- }
- /**
- * Get the productId referred to by a trump product.
- *
- * @param trumpSummary
- * trump product with reference to product id.
- * @return product id, or null if unable to parse product id.
- */
- protected ProductId getTrumpedProductId(final ProductSummary trumpSummary) {
- try {
- // use 'product' link from previous summary
- ProductId trumpedId = ProductId.parse(trumpSummary.getLinks().get(
- "product").get(0).toString());
- return trumpedId;
- } catch (Exception e) {
- return null;
- }
- }
- /**
- * Get a product summary object using its product id.
- *
- * @param id
- * id to find.
- * @return matching product summary or null.
- * @throws Exception if error occurs
- */
- protected ProductSummary getProductSummaryById(final ProductId id)
- throws Exception {
- ProductIndexQuery query = new ProductIndexQuery();
- query.getProductIds().add(id);
- List<ProductSummary> summaries = productIndex.getProducts(query);
- if (summaries.size() > 0) {
- return summaries.get(0);
- }
- return null;
- }
- /**
- * Update a product summary weight
- *
- * @param event
- * the event.
- * @param summary
- * the summary.
- * @param preferredWeight
- * the weight to set.
- * @return event with updated summary.
- * @throws Exception if error occurs
- */
- protected Event setSummaryWeight(Event event, ProductSummary summary,
- final Long preferredWeight) throws Exception {
- if (summary.getPreferredWeight() == preferredWeight) {
- // already set
- return event;
- }
- LOGGER.info("Setting product preferred weight "
- + summary.getId().toString() + ", weight "
- + summary.getPreferredWeight() + " (old) => " + preferredWeight
- + " (new)");
- // remove existing summary from event
- event = productIndex.removeAssociation(event, summary);
- productIndex.removeProductSummary(summary);
- // set custom weight
- summary.setPreferredWeight(preferredWeight);
- // add updated summary to event
- summary = productIndex.addProductSummary(summary);
- event = productIndex.addAssociation(event, summary);
- // return updated event
- return event;
- }
- /**
- * Resummarize a product within an event.
- *
- * @param event
- * the event.
- * @param summary
- * the summary.
- * @return event with updated summary.
- * @throws Exception if error occurs
- */
- protected Event resummarizeProduct(final Event event,
- final ProductSummary summary) throws Exception {
- Event updatedEvent = null;
- ProductSummary updatedSummary = null;
- // remove existing summary from event
- updatedEvent = productIndex.removeAssociation(event, summary);
- productIndex.removeProductSummary(summary);
- // use module to summarize original product
- Product product = productStorage.getProduct(summary.getId());
- if (product == null) {
- throw new Exception("Unable to resummarize product, "
- + "product not in storage " + summary.getId().toString());
- }
- updatedSummary = getModule(product).getProductSummary(product);
- LOGGER.info("Resummarizing product " + summary.getId().toString()
- + ", weight " + summary.getPreferredWeight() + " (old) => "
- + updatedSummary.getPreferredWeight() + " (new)");
- // add updated summary to event
- updatedSummary = productIndex.addProductSummary(updatedSummary);
- updatedEvent = productIndex
- .addAssociation(updatedEvent, updatedSummary);
- // return updated event
- return updatedEvent;
- }
- /**
- * Check for event splits (and split them if needed).
- *
- * @param summary
- * the summary the indexer is currently processing.
- * @param originalEvent
- * the event before the indexer made any changes.
- * @param updatedEvent
- * the event after the indexer made any changes.
- * @return List of changes made during this method.
- * @throws Exception if error occurs
- */
- protected synchronized List<IndexerChange> checkForEventSplits(
- final ProductSummary summary, final Event originalEvent,
- final Event updatedEvent) throws Exception {
- List<IndexerChange> changes = new ArrayList<IndexerChange>();
- // save reference so we can check later if this has changed
- Event splitEvent = updatedEvent;
- // ## 1) Split event into sub events
- Map<String, Event> subEvents = splitEvent.getSubEvents();
- if (subEvents.size() == 1) {
- // still only one event, cannot split
- return changes;
- }
- // the original eventid before the indexer started processing this
- // update (may have already changed in updatedEvent).
- String originalEventId = originalEvent.getEventId();
- // ## 2) See if sub events still associate
- // list of events that actually are split
- List<Event> alreadySplit = new ArrayList<Event>();
- // see how sub events associate compared to this event (since the
- // original event is the one that should be "UPDATED")
- Event originalSubEvent = subEvents.remove(originalEventId);
- if (originalSubEvent == null) {
- // wtf
- // this should always exist because its ID was returned by getEventId,
- // should should have at least one product. Log:
- LOGGER.warning("[" + getName() + "] originalSubEvent is null"
- + ", originalEventId=" + originalEventId);
- for (final String id: subEvents.keySet()) {
- final Event subEvent = subEvents.get(id);
- subEvent.log(LOGGER);
- }
- return changes;
- }
- Iterator<String> subEventsIter = new ArrayList<String>(
- subEvents.keySet()).iterator();
- while (subEventsIter.hasNext()) {
- String nextEventId = subEventsIter.next();
- Event nextEvent = subEvents.get(nextEventId);
- if (!originalSubEvent.isAssociated(nextEvent, associator)) {
- // not associated, so split
- splitEvent = splitEvents(splitEvent, nextEvent);
- // see if associated to any that already split
- Iterator<Event> alreadySplitIter = alreadySplit.iterator();
- while (alreadySplitIter.hasNext()) {
- Event alreadySplitEvent = alreadySplitIter.next();
- if (alreadySplitEvent.isAssociated(nextEvent, associator)) {
- // need to merge with event that already split
- // will need reference to alreadySplitEvent, so keep
- // reference to merged event in nextEvent
- nextEvent = mergeEvents(alreadySplitEvent, nextEvent);
- // remove original already split
- alreadySplit.remove(alreadySplitEvent);
- // add merged nextEvent
- alreadySplit.add(nextEvent);
- // signal that nextEvent was already added to
- // alreadySplit
- nextEvent = null;
- // associated, and one at a time, so stop checking
- break;
- }
- }
- if (nextEvent != null) {
- // wasn't merged with an already split event
- alreadySplit.add(nextEvent);
- }
- }
- }
- if (alreadySplit.size() == 0) {
- // didn't split any events...
- return changes;
- }
- // ## 3) Build list of Indexer changes that actually happened.
- String splitEventId = splitEvent.getEventId();
- if (!originalEventId.equalsIgnoreCase(splitEventId)) {
- LOGGER.warning("[" + getName() + "] eventid (" + splitEventId
- + ") no longer matches original (" + originalEventId
- + ") after split.");
- }
- // first notify about updated original event
- changes.add(new IndexerChange(
- (splitEvent.isDeleted() ? IndexerChange.EVENT_DELETED
- : IndexerChange.EVENT_UPDATED), originalEvent,
- splitEvent));
- // now notify about all events that split from original event
- Iterator<Event> alreadySplitIter = alreadySplit.iterator();
- while (alreadySplitIter.hasNext()) {
- Event alreadySplitEvent = alreadySplitIter.next();
- changes.add(new IndexerChange(IndexerChange.EVENT_SPLIT, null,
- alreadySplitEvent));
- }
- // done
- return changes;
- }
- /**
- * Removes the leaf event (and all its products) from the root event. This
- * method modifies the runtime objects as well as updating the index DB.
- *
- * @param root
- * The root event from which all leaf products will be removed
- * @param leaf
- * The event (with products) that will be removed from the root
- * @return copy of root without the products that have been removed. The
- * indexId property of leaf is updated to its new value.
- * @throws Exception if error occurs
- */
- protected synchronized Event splitEvents(final Event root, final Event leaf)
- throws Exception {
- Event updated = root;
- Iterator<ProductSummary> leafProducts = leaf.getProductList()
- .iterator();
- // assign leaf indexId by reference
- Event insertedLeafEvent = productIndex.addEvent(leaf);
- leaf.setIndexId(insertedLeafEvent.getIndexId());
- while (leafProducts.hasNext()) {
- ProductSummary product = leafProducts.next();
- if (updated != null) {
- updated = productIndex.removeAssociation(updated, product);
- }
- // leaf already has the product in its list, not returning anyways.
- productIndex.addAssociation(leaf, product);
- }
- return updated;
- }
- /**
- * Merges the child event (and all its products) into the target event. If
- * the child event attempts to merge in a product that is the same as one
- * already associated to the target event, the child version of the product
- * takes precedence. Note: This only applies when the target and child
- * product have the same type, code, source, and update time; i.e. the
- * products are duplicates. This method modifies the runtime objects as well
- * as the index DB. The child event is then deleted.
- *
- * @param target
- * The target event into which the child is merged.
- * @param child
- * The child event to be merged into the target.
- * @return the updated event
- * @throws Exception if error occurs
- */
- protected synchronized Event mergeEvents(final Event target,
- final Event child) throws Exception {
- Iterator<ProductSummary> childProducts = child.getProductList()
- .iterator();
- Event updatedEvent = target;
- Event updatedChild = child;
- while (childProducts.hasNext()) {
- ProductSummary product = childProducts.next();
- productIndex.removeAssociation(child, product);
- updatedChild = productIndex
- .removeAssociation(updatedChild, product);
- updatedEvent = productIndex.addAssociation(updatedEvent, product);
- }
- productIndex.removeEvent(updatedChild);
- return updatedEvent;
- }
- /**
- * Check and merge any nearby events or previously unassociated products
- * that now associate.
- *
- * @param summary
- * the summary currently being processed by the indexer.
- * @param originalEvent
- * the event before any changes.
- * @param updatedEvent
- * the event after the summary was associated.
- * @return list of any merge type changes.
- * @throws Exception if error occurs
- */
- protected synchronized List<IndexerChange> checkForEventMerges(
- final ProductSummary summary, final Event originalEvent,
- final Event updatedEvent) throws Exception {
- List<IndexerChange> changes = new ArrayList<IndexerChange>();
- Event mergedEvent = updatedEvent;
- // ## 1) Check for nearby events
- if (originalEvent != null) {
- // only if the event was not just created, because otherwise this
- // product would have associated to an existing event
- // build the query
- EventSummary mergedSummary = mergedEvent.getEventSummary();
- ProductIndexQuery nearbyEvents = associator.getLocationQuery(
- mergedSummary.getTime(), mergedSummary.getLatitude(),
- mergedSummary.getLongitude());
- if (associateUsingCurrentProducts && nearbyEvents != null) {
- nearbyEvents.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
- }
- LOGGER.finer("[" + getName() + "] searching for nearby events");
- // do the search
- Iterator<Event> events = productIndex.getEvents(nearbyEvents)
- .iterator();
- LOGGER.finer("[" + getName()
- + "] search for nearby events complete");
- while (events.hasNext()) {
- Event foundEvent = events.next();
- if (foundEvent.getIndexId().equals(mergedEvent.getIndexId())) {
- // found the event currently being checked for merges,
- // ignore
- continue;
- } else if (mergedEvent.isAssociated(foundEvent, associator)) {
- // event associates to another event, merge them
- mergedEvent = mergeEvents(mergedEvent, foundEvent);
- changes.add(new IndexerChange(IndexerChange.EVENT_MERGED,
- foundEvent, null));
- }
- }
- }
- // ## 2) Now look for products that were previously unassociated, but
- // that can now associate because of the event id of the incoming
- // product
- // (if the event already had this id, these products would already be
- // associated...)
- String source = summary.getEventSource();
- String sourceCode = summary.getEventSourceCode();
- // without this check, all unassociated products would be added...(BAD)
- if (source != null && sourceCode != null) {
- // build the query
- ProductIndexQuery unassociatedProducts = new ProductIndexQuery();
- unassociatedProducts.setEventSource(source);
- unassociatedProducts.setEventSourceCode(sourceCode);
- // run the query
- LOGGER.finer("[" + getName()
- + "] searching for unassociated products");
- Iterator<ProductSummary> summaries = productIndex
- .getUnassociatedProducts(unassociatedProducts).iterator();
- LOGGER.finer("[" + getName()
- + "] search for unassociated products complete");
- // add associations
- while (summaries.hasNext()) {
- mergedEvent = productIndex.addAssociation(mergedEvent,
- summaries.next());
- }
- }
- // ## 2.5) Check for merge by associate product
- // only need to check when associate product is first added
- // THIS IMPLEMENTATION ASSUMES: both events exist when associate product
- // is sent. Search for existing event (during getPrevEvent) does not
- // search associate products othereventsource or othereventsourcecode
- // properties.
- if (summary.getType().equals(Event.ASSOCIATE_PRODUCT_TYPE)
- && !summary.isDeleted()) {
- String otherEventSource = summary.getProperties().get(
- Event.OTHEREVENTSOURCE_PROPERTY);
- String otherEventSourceCode = summary.getProperties().get(
- Event.OTHEREVENTSOURCECODE_PROPERTY);
- if (otherEventSource == null || otherEventSourceCode == null) {
- LOGGER.warning(Event.ASSOCIATE_PRODUCT_TYPE
- + " product without " + Event.OTHEREVENTSOURCE_PROPERTY
- + " or " + Event.OTHEREVENTSOURCECODE_PROPERTY
- + " properties, ignoring");
- } else {
- // search for associated event
- ProductIndexQuery associateQuery = new ProductIndexQuery();
- associateQuery.setEventSource(otherEventSource);
- associateQuery.setEventSourceCode(otherEventSourceCode);
- if (associateUsingCurrentProducts) {
- associateQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
- }
- LOGGER.finer("[" + getName()
- + "] searching for associated event");
- // do the search
- Iterator<Event> events = productIndex.getEvents(associateQuery)
- .iterator();
- LOGGER.finer("[" + getName()
- + "] search for associated event complete");
- while (events.hasNext()) {
- Event foundEvent = events.next();
- if (foundEvent.getIndexId()
- .equals(mergedEvent.getIndexId())) {
- // found the event currently being checked for merges,
- // ignore
- continue;
- } else if (mergedEvent.isAssociated(foundEvent)) {
- // event associates to another event, merge them
- mergedEvent = mergeEvents(mergedEvent, foundEvent);
- changes.add(new IndexerChange(
- IndexerChange.EVENT_MERGED, foundEvent, null));
- }
- }
- }
- }
- // ## 4) Check if the event has changed during this method
- if (mergedEvent != updatedEvent) {
- // something has changed, so add an IndexerChange
- if (originalEvent == null) {
- // no previous event, it was added (although unassociated
- // products were associated)
- changes.add(new IndexerChange(IndexerChange.EVENT_ADDED, null,
- mergedEvent));
- } else {
- // may have merged with other events, or associated unassociated
- // products. Other changes represent the merges, so just
- // indicate update/delete.
- changes.add(new IndexerChange(
- (mergedEvent.isDeleted() ? IndexerChange.EVENT_DELETED
- : IndexerChange.EVENT_UPDATED), originalEvent,
- mergedEvent));
- }
- }
- return changes;
- }
- /**
- * Takes a summary return the previous
- * @param summary A product summary
- * @return The previous summary
- * @throws Exception if error occurs
- */
- protected synchronized ProductSummary getPrevProductVersion(
- ProductSummary summary) throws Exception {
- ProductSummary prevSummary = null;
- List<ProductSummary> candidateSummaries = null;
- ProductIndexQuery query = new ProductIndexQuery();
- // Set type, code and source
- query.setProductType(summary.getType());
- query.setProductCode(summary.getCode());
- query.setProductSource(summary.getSource());
- // Query the index (first look for associated products)
- candidateSummaries = productIndex.getProducts(query);
- if (candidateSummaries == null || candidateSummaries.size() == 0) {
- // No summaries found associated to events, try unassociated.
- candidateSummaries = productIndex.getUnassociatedProducts(query);
- }
- if (candidateSummaries != null && candidateSummaries.size() > 0) {
- prevSummary = candidateSummaries.get(0);
- if (candidateSummaries.size() != 1) {
- LOGGER.warning(
- "[" + getName() + "] " + summary.getId().toString() +
- ": More than one existing summary is claiming to be most recent.");
- }
- }
- return prevSummary;
- }
- /**
- * Associate products are processed during
- * {@link #checkForEventMerges(ProductSummary, Event, Event)} and are
- * ignored during this method.
- *
- * @see Associator#getSearchRequest(ProductSummary)
- * @see Associator#chooseEvent(List, ProductSummary)
- *
- * @param summary ProductSummary
- * @return Event to which a productSummary is associated, or null if not
- * found.
- * @throws Exception if error occurs
- */
- protected synchronized Event getPrevEvent(ProductSummary summary)
- throws Exception {
- return getPrevEvent(summary, false);
- }
- /**
- * Find an existing event that summary should associate with.
- *
- * @param summary the previous event.
- * @param associating whether associating (vs archiving).
- * @return previous event, or null if none found.
- * @throws Exception if error occurs
- */
- protected synchronized Event getPrevEvent(ProductSummary summary,
- boolean associating) throws Exception {
- Event prevEvent = null;
- List<Event> candidateEvents = null;
- SearchRequest request = associator.getSearchRequest(summary);
- if (associating && associateUsingCurrentProducts) {
- for (SearchQuery query : request.getQueries()) {
- query.getProductIndexQuery().setResultType(
- ProductIndexQuery.RESULT_TYPE_CURRENT);
- }
- }
- SearchResponse response = search(request);
- if (response != null) {
- candidateEvents = response.getEvents();
- }
- if (candidateEvents != null && candidateEvents.size() > 0) {
- // Found some events. Find best match.
- prevEvent = associator.chooseEvent(candidateEvents, summary);
- }
- return prevEvent;
- }
- /*
- * protected IndexerEvent createIndexerEvent(ProductSummary prevSummary,
- * Event prevEvent, ProductSummary summary, Event event) { IndexerType type
- * = null; IndexerEvent indexerEvent = new IndexerEvent(this);
- *
- * // ---------------------------------- // Determine the type if
- * IndexerEvent // ----------------------------------
- *
- * if (summary.getStatus() == Product.STATUS_DELETE) { type =
- * IndexerEvent.PRODUCT_DELETED; if (event != null) { // Since we have an
- * event, this is now an EVENT_UPDATED type type =
- * IndexerEvent.EVENT_UPDATED;
- *
- * // Check if all products on event are deleted. if
- * (event.getProductList().size() == 0) { type = IndexerEvent.EVENT_DELETED;
- * } } } else { // Product was not a "DELETE" status. Must be an added or
- * updated. if (prevEvent == null && event != null) { type =
- * IndexerEvent.EVENT_ADDED; } else if (prevEvent != null && event != null)
- * { type = IndexerEvent.EVENT_UPDATED; } else if (prevSummary == null &&
- * summary != null) { type = IndexerEvent.PRODUCT_ADDED; } else if
- * (prevSummary != null && summary != null) { type =
- * IndexerEvent.PRODUCT_UPDATED; }
- *
- * if (summary == null) { // Not sure how this happens.
- * LOGGER.warning("Trying to notify of a null summary."); } }
- *
- * // Set parameters indexerEvent.setEventType(type);
- * indexerEvent.setOldEvent(prevEvent); indexerEvent.setSummary(summary);
- * indexerEvent.setEvent(event);
- *
- * return indexerEvent; }
- */
- /**
- * Loads parent, specific, and dependent configurations; in that order.
- */
- @Override
- public synchronized void configure(Config config) throws Exception {
- // -- Load parent configurations -- //
- super.configure(config);
- // reads properties from same config section
- defaultModule.getSignatureVerifier().configure(config);
- // -- Load specific configurations -- //
- String associatorName = config.getProperty(ASSOCIATOR_CONFIG_PROPERTY);
- if (associatorName != null) {
- associator = (Associator) Config.getConfig().getObject(
- associatorName);
- }
- String storageName = config.getProperty(STORAGE_CONFIG_PROPERTY);
- String storageDirectory = config
- .getProperty(STORAGE_DIRECTORY_CONFIG_PROPERTY);
- if (storageName != null) {
- LOGGER.config("[" + getName() + "] loading ProductStorage '"
- + storageName + "'");
- productStorage = (ProductStorage) Config.getConfig().getObject(
- storageName);
- if (productStorage == null) {
- throw new ConfigurationException("[" + getName()
- + "] ProductStorage '" + storageName
- + "' is not properly configured");
- }
- } else if (storageDirectory != null) {
- LOGGER.config("[" + getName() + "] using storage directory '"
- + storageDirectory + "'");
- productStorage = new FileProductStorage(new File(storageDirectory));
- } else {
- productStorage.configure(config);
- }
- String indexName = config.getProperty(INDEX_CONFIG_PROPERTY);
- String indexFileName = config.getProperty(INDEXFILE_CONFIG_PROPERTY);
- if (indexName != null) {
- LOGGER.config("[" + getName() + "] loading ProductIndex '"
- + indexName + "'");
- productIndex = (ProductIndex) Config.getConfig().getObject(
- indexName);
- if (productIndex == null) {
- throw new ConfigurationException("[" + getName()
- + "] ProductIndex '" + indexName
- + "' is not properly configured");
- }
- } else if (indexFileName != null) {
- LOGGER.config("[" + getName() + "] using sqlite product index '"
- + indexFileName + "'");
- productIndex = new JDBCProductIndex(indexFileName);
- } else {
- productIndex.configure(config);
- }
- // How often to check for expired products
- String archivePolicy = config
- .getProperty(INDEX_ARCHIVE_POLICY_PROPERTY);
- if (archivePolicy != null) {
- Iterator<String> iter = StringUtils.split(archivePolicy, ",")
- .iterator();
- while (iter.hasNext()) {
- String policyName = iter.next();
- LOGGER.config("[" + getName() + "] loading ArchivePolicy '"
- + policyName + "'");
- ArchivePolicy policy = (ArchivePolicy) Config.getConfig()
- .getObject(policyName);
- if (policy == null) {
- throw new ConfigurationException("[" + getName()
- + "] ArchivePolicy '" + policyName
- + "' is not configured properly");
- }
- // Only use archive policies that are valid
- if (policy.isValidPolicy()) {
- archivePolicies.add(policy);
- } else {
- LOGGER.warning("[" + getName() + "] ArchivePolicy '"
- + policyName + "' is not valid");
- }
- }
- }
- // How often should the archive policies be run
- String buffer = config.getProperty(INDEX_ARCHIVE_INTERVAL_PROPERTY);
- if (buffer != null) {
- archiveInterval = Long.parseLong(buffer);
- } else {
- // Use default age
- archiveInterval = INDEX_ARCHIVE_INTERVAL_DEFAULT;
- }
- LOGGER.config("[" + getName() + "] archive interval is '"
- + archiveInterval + "'");
- // Always use at least a default indexer module
- String moduleNames = config.getProperty(MODULES_CONFIG_PROPERTY);
- if (moduleNames != null) {
- Iterator<String> modules = StringUtils.split(moduleNames, ",")
- .iterator();
- while (modules.hasNext()) {
- String moduleName = modules.next();
- if (moduleName.equals("")) {
- continue;
- }
- LOGGER.config("[" + getName() + "] loading indexer module '"
- + moduleName + "'");
- IndexerModule module = (IndexerModule) Config.getConfig()
- .getObject(moduleName);
- if (module == null) {
- throw new ConfigurationException("[" + getName()
- + "] indexer module '" + moduleName
- + "' is not configured properly");
- }
- addModule(module);
- }
- } else {
- LOGGER.config("[" + getName() + "] no indexer modules configured.");
- }
- String listenerNames = config.getProperty(LISTENERS_CONFIG_PROPERTY);
- if (listenerNames != null) {
- Iterator<String> listeners = StringUtils.split(listenerNames, ",")
- .iterator();
- while (listeners.hasNext()) {
- String listenerName = listeners.next();
- if (listenerName.equals("")) {
- continue;
- }
- LOGGER.config("[" + getName() + "] loading indexer listener '"
- + listenerName + "'");
- IndexerListener listener = (IndexerListener) Config.getConfig()
- .getObject(listenerName);
- if (listener == null) {
- throw new ConfigurationException("[" + getName()
- + "] indexer listener '" + listenerName
- + "' is not configured properly");
- }
- addListener(listener);
- }
- } else {
- LOGGER.config("[" + getName()
- + "] no indexer listeners configured.");
- }
- String localRegions = config.getProperty(LOCAL_REGIONS_PROPERTY,
- DEFAULT_LOCAL_REGIONS);
- this.localRegionsFile = new File(localRegions);
- LOGGER.config("[" + getName() + "] Local regions file: "
- + this.localRegionsFile);
- String enableSearch = config.getProperty(ENABLE_SEARCH_PROPERTY,
- DEFAULT_ENABLE_SEARCH);
- if (Boolean.valueOf(enableSearch)) {
- searchSocket = new SearchServerSocket();
- searchSocket.setIndex(this);
- int searchPort = Integer.parseInt(config.getProperty(
- SEARCH_PORT_PROPERTY, DEFAULT_SEARCH_PORT));
- searchSocket.setPort(searchPort);
- int searchThreads = Integer.parseInt(config.getProperty(
- SEARCH_THREADS_PROPERTY, DEFAULT_SEARCH_THREADS));
- searchSocket.setThreads(searchThreads);
- LOGGER.config("[" + getName()
- + "] SearchServerSocket running at localhost:" + searchPort
- + ", with " + searchThreads + " threads");
- }
- // -- Load dependent configurations -- //
- associateUsingCurrentProducts = Boolean.valueOf(
- config.getProperty(ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY,
- DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS));
- LOGGER.config("[" + getName() + "] associateUsingCurrentProducts = "
- + associateUsingCurrentProducts);
- }
- /**
- * Shuts down the Indexer. The parent shutdown method is called and then all
- * executor services (from listeners) are shutdown in sequence.
- */
- @Override
- public synchronized void shutdown() throws Exception {
- // -- Shut down dependent processes -- //
- try {
- if (readProductIndex != productIndex) {
- readProductIndex.shutdown();
- }
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception shutting down read product index", e);
- }
- try {
- productIndex.shutdown();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception shutting down product index", e);
- }
- productStorage.shutdown();
- // ExecutorServices tied to known listeners.
- Iterator<IndexerListener> iter = listeners.keySet().iterator();
- while (iter.hasNext()) {
- IndexerListener listener = iter.next();
- try {
- listeners.get(listener).shutdown();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception shutting down listener executor", e);
- }
- if (listener instanceof Configurable) {
- try {
- ((Configurable) listener).shutdown();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception shutting down listener", e);
- }
- }
- }
- Iterator<IndexerModule> modules = this.modules.iterator();
- while (modules.hasNext()) {
- IndexerModule module = modules.next();
- if (module instanceof Configurable) {
- try {
- ((Configurable) module).shutdown();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception shutting down module", e);
- }
- }
- }
- // -- Shut down our own specific processes -- //
- // Shut down our timers if they exist
- if (archiveTask != null) {
- archiveTask.cancel();
- archiveTask = null;
- }
- if (archiveTimer != null) {
- archiveTimer.cancel();
- archiveTimer = null;
- }
- if (searchSocket != null) {
- searchSocket.shutdown();
- }
- // -- Call parent shutdown method -- //
- super.shutdown();
- }
- /**
- * Starts up the necessary parent, specific, and dependent processes, in
- * that order.
- */
- @Override
- public synchronized void startup() throws Exception {
- // -- Call parent startup method -- //
- super.startup();
- // -- Start up our own specific processes -- //
- backgroundService = Executors.newCachedThreadPool();
- // -- Start dependent processes -- //
- // ExecutorServices tied to known listeners.
- Iterator<IndexerListener> iter = listeners.keySet().iterator();
- while (iter.hasNext()) {
- IndexerListener listener = iter.next();
- if (listener instanceof Configurable) {
- ((Configurable) listener).startup();
- }
- }
- // configure regions factory before modules
- ANSSRegionsFactory factory = ANSSRegionsFactory.getFactory(false);
- factory.setLocalRegions(localRegionsFile);
- factory.startup();
- Iterator<IndexerModule> modules = this.modules.iterator();
- while (modules.hasNext()) {
- IndexerModule module = modules.next();
- if (module instanceof Configurable) {
- ((Configurable) module).startup();
- }
- }
- // ProductIndex
- productStorage.startup();
- productIndex.startup();
- // if using mysql product index, create separate read index
- readProductIndex = null;
- if (productIndex instanceof JDBCProductIndex) {
- JDBCProductIndex jdbcProductIndex = (JDBCProductIndex) productIndex;
- if (jdbcProductIndex.getDriver().contains("mysql")) {
- readProductIndex = new JDBCProductIndex();
- ((JDBCProductIndex) readProductIndex).setDriver(jdbcProductIndex.getDriver());
- ((JDBCProductIndex) readProductIndex).setUrl(jdbcProductIndex.getUrl());
- readProductIndex.startup();
- }
- }
- if (readProductIndex == null) {
- // otherwise use same index
- readProductIndex = productIndex;
- }
- // Cleanup thread to purge old products
- if (archivePolicies.size() > 0) {
- // Instantiate a timer object
- archiveTimer = new Timer();
- // Instantiate the task object
- archiveTask = new TimerTask() {
- public void run() {
- try {
- int[] counts = purgeExpiredProducts();
- LOGGER.info(String
- .format("["
- + getName()
- + "] purged %d expired events and %d expired unassociated products.",
- counts[0], counts[1]));
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] indexer cleanup thread threw exception",
- ex);
- }
- }
- };
- // Run archiver immediately at startup, then at regular intervals
- archiveTimer.schedule(archiveTask, 0L, archiveInterval);
- }
- if (searchSocket != null) {
- searchSocket.startup();
- }
- }
- /**
- * Checks the index for content that match a configured archive policy.
- * Events are checked first and matched events are removed along with all
- * their products. Listeners are notified of each archived event with an
- * EVENT_ARCHIVED type. Unassociated products are checked next, matched
- * unassociated products are archived and listeners are notified with
- * PRODUCT_ARCHIVE type.
- *
- * Note: Product "age" is determined by when the earthquake for that product
- * occurred and does not reflect how long the product has actually been in
- * the index.
- *
- * @see #archivePolicies
- * @return Int array of size 2
- * @throws Exception if error occurs
- */
- public synchronized int[] purgeExpiredProducts() throws Exception {
- int[] counts = { 0, 0 };
- ProductIndexQuery query = null;
- ArchivePolicy policy = null;
- if (isDisableArchive()) {
- LOGGER.info("Archiving disabled");
- return counts;
- }
- for (int i = 0; i < archivePolicies.size(); i++) {
- policy = archivePolicies.get(i);
- query = policy.getIndexQuery();
- if (!(policy instanceof ProductArchivePolicy)) {
- // -- Purge expired events for this policy -- //
- LOGGER.fine("[" + getName()
- + "] running event archive policy (" + policy.getName()
- + ")");
- try {
- // Get a list of those events
- List<Event> expiredEvents = productIndex.getEvents(query);
- // Loop over list of expired events and remove each one
- Iterator<Event> eventIter = expiredEvents.iterator();
- while (eventIter.hasNext()) {
- Event event = eventIter.next();
- LOGGER.info("[" + getName() + "] archiving event "
- + event.getEventId());
- event.log(LOGGER);
- productIndex.beginTransaction();
- try {
- removeEvent(event);
- // Notify of the event archived
- IndexerEvent notification = new IndexerEvent(this);
- notification.setSummary(null);
- notification.addIndexerChange(new IndexerChange(
- IndexerChange.EVENT_ARCHIVED, event, null));
- notifyListeners(notification);
- ++counts[0];
- productIndex.commitTransaction();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception archiving event "
- + event.getEventId() + ", rolling back", e);
- productIndex.rollbackTransaction();
- }
- }
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception running event archive policy ("
- + policy.getName() + ") ", e);
- }
- }
- if (policy instanceof ProductArchivePolicy) {
- ProductArchivePolicy productPolicy = (ProductArchivePolicy) policy;
- // -- Purge expired products for this policy -- //
- LOGGER.fine("[" + getName()
- + "] running product archive policy ("
- + policy.getName() + ")");
- try {
- // Get a list of those products
- List<ProductSummary> expiredProducts;
- if (productPolicy.isOnlyUnassociated()) {
- expiredProducts = productIndex
- .getUnassociatedProducts(query);
- } else {
- expiredProducts = productIndex.getProducts(query);
- }
- // Loop over list of expired products and remove each one
- Iterator<ProductSummary> productIter = expiredProducts
- .iterator();
- while (productIter.hasNext()) {
- ProductSummary product = productIter.next();
- LOGGER.info("[" + getName() + "] archiving product "
- + product.getId().toString());
- productIndex.beginTransaction();
- try {
- removeSummary(product);
- // Notify of the product archived
- IndexerEvent notification = new IndexerEvent(this);
- notification.setSummary(product);
- notification.addIndexerChange(new IndexerChange(
- IndexerChange.PRODUCT_ARCHIVED, null, null));
- notifyListeners(notification);
- ++counts[1];
- productIndex.commitTransaction();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception archiving event "
- + product.getId().toString() + ", rolling back", e);
- productIndex.rollbackTransaction();
- }
- }
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception running product archive policy ("
- + policy.getName() + ")", e);
- }
- }
- }
- return counts;
- }
- /**
- * Removes the given event from the Indexer ProductIndex and ProductStorage.
- *
- * @param event event to remove
- * @throws Exception
- * If errors occur while removing the event
- */
- protected synchronized void removeEvent(Event event) throws Exception {
- // Removing an "event" from storage is really just removing all its
- // associated products
- List<ProductSummary> summaries = event.getAllProductList();
- Iterator<ProductSummary> summaryIter = summaries.iterator();
- while (summaryIter.hasNext()) {
- ProductSummary summary = summaryIter.next();
- // Remove product from storage
- productStorage.removeProduct(summary.getId());
- // Remove product summary from index
- productIndex.removeProductSummary(summary);
- }
- // Remove from index
- productIndex.removeEvent(event);
- }
- /**
- * Removes the given summary from the Indexer ProductIndex and
- * ProductStorage.
- *
- * @param summary to remove
- * @throws Exception
- * If errors occur while removing the summary
- */
- protected synchronized void removeSummary(ProductSummary summary)
- throws Exception {
- Event event = getPrevEvent(summary);
- if (event != null) {
- List<ProductSummary> eventProducts = event.getAllProductList();
- if (eventProducts != null && eventProducts.size() == 1
- && eventProducts.get(0).getId().equals(summary.getId())) {
- // last product for the event
- removeEvent(event);
- // product is already removed by removeEvent
- return;
- }
- }
- // Remove product from storage
- productStorage.removeProduct(summary.getId());
- // Remove product summary from index
- productIndex.removeProductSummary(summary);
- // if product was associated to event need to update index
- if (event != null) {
- // remove the product from the event
- event.removeProduct(summary);
- // update event table
- ArrayList<Event> events = new ArrayList<Event>();
- events.add(event);
- productIndex.eventsUpdated(events);
- }
- }
- /**
- * Tries to create an event based on information in the given summary. If
- * successful, the summary is associated to the newly created event. Note:
- * The summary must have been externally added to the ProductIndex before
- * this method can be called.
- *
- * A product summary must have non-null (id) source and code, (location)
- * latitude and longitude, and (time) time, in order to have the minimum
- * properties required to create a new event.
- *
- * @param summary
- * The product summary serving as the basis for the new event.
- * @return The event that is created, added and associated or null if the
- * given summary can not be used to create a new event.
- * @throws Exception
- * If the ProductIndex.addEvent throws an exception or if the
- * ProductIndex.addAssociation throws an exception. This may
- * happen if this method is called before the summary is added
- * to the ProductIndex.
- */
- private synchronized Event createEvent(ProductSummary summary)
- throws Exception {
- if (Event.productHasOriginProperties(summary)) {
- Event event = productIndex.addEvent(new Event());
- return productIndex.addAssociation(event, summary);
- } else {
- return null;
- }
- }
- /**
- * Search for products in this index.
- *
- * @param request
- * the search request.
- * @return the search response.
- * @throws Exception if error occurs
- */
- public synchronized SearchResponse search(SearchRequest request)
- throws Exception {
- SearchResponse response = new SearchResponse();
- // Execute each query
- Iterator<SearchQuery> iter = request.getQueries().iterator();
- while (iter.hasNext()) {
- SearchQuery query = iter.next();
- if (query instanceof EventsSummaryQuery) {
- List<EventSummary> eventSummaries = new LinkedList<EventSummary>();
- Iterator<Event> events = productIndex.getEvents(
- query.getProductIndexQuery()).iterator();
- // convert events to event summaries
- while (events.hasNext()) {
- Event event = events.next();
- eventSummaries.add(event.getEventSummary());
- }
- ((EventsSummaryQuery) query).setResult(eventSummaries);
- }
- else if (query instanceof EventDetailQuery) {
- List<Event> events = productIndex.getEvents(query
- .getProductIndexQuery());
- ((EventDetailQuery) query).setResult(events);
- }
- else if (query instanceof ProductsSummaryQuery) {
- List<ProductSummary> products = productIndex.getProducts(query
- .getProductIndexQuery());
- ((ProductsSummaryQuery) query).setResult(products);
- }
- else if (query instanceof ProductDetailQuery) {
- List<Product> products = new LinkedList<Product>();
- Iterator<ProductId> ids = query.getProductIndexQuery()
- .getProductIds().iterator();
- // fetch products from storage
- while (ids.hasNext()) {
- ProductId id = ids.next();
- Product product = productStorage.getProduct(id);
- if (product != null) {
- products.add(product);
- }
- }
- ((ProductDetailQuery) query).setResult(products);
- }
- response.addResult(query);
- }
- return response;
- }
- /** @return disableArchive */
- public boolean isDisableArchive() {
- return disableArchive;
- }
- /** @param disableArchive boolean to set */
- public void setDisableArchive(boolean disableArchive) {
- this.disableArchive = disableArchive;
- }
- /**
- * @return the archiveInterval
- */
- public long getArchiveInterval() {
- return archiveInterval;
- }
- /**
- * @param archiveInterval
- * the archiveInterval to set
- */
- public void setArchiveInterval(long archiveInterval) {
- this.archiveInterval = archiveInterval;
- }
- /**
- * @return the archivePolicies
- */
- public List<ArchivePolicy> getArchivePolicies() {
- return archivePolicies;
- }
- }