DefaultIndexerListener.java
package gov.usgs.earthquake.indexer;
import gov.usgs.earthquake.indexer.IndexerChange.IndexerChangeType;
import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.util.CompareUtil;
import gov.usgs.util.Config;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
/**
* DefaultIndexerListener provides a starting point from which all
* IndexerListeners may extend.
*
* As a child-class of the AbstractListener, this may be configured with all of
* the parent parameters and also accepts the following:
*
* <dl>
* <dt>command</dt>
* <dd>(Required) The command to execute. This must be an executable command and
* may include arguments. Any product-specific arguments are appended at the end
* of command.</dd>
*
* <dt>storage</dt>
* <dd>(Required) A directory used to store all products. Each product is
* extracted into a separate directory within this directory and is referenced
* by the --directory=/path/to/directory argument when command is executed.</dd>
*
* <dt>processUnassociated</dt>
* <dd>(Optional, Default = false) Whether or not to process unassociated
* products. Valid values are "true" and "false".</dd>
*
* <dt>processPreferredOnly</dt>
* <dd>(Optional, Default = false) Whether or not to process only preferred
* products of the type accepted by this listener. Valid values are "true" and
* "false".</dd>
*
* <dt>ignoreArchive</dt>
* <dd>(Optional, Default = false) Whether or not to ignore EVENT_ARCHIVED and
* PRODUCT_ARCHIVED indexer events. Value values are "true" and "false".</dd>
*
* </dl>
*/
public class DefaultIndexerListener extends AbstractListener implements
IndexerListener {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(DefaultIndexerListener.class.getName());
/** Property for process preferred only */
public static final String PROCESS_PREFERRED_ONLY_PROPERTY = "processPreferredOnly";
/** Default state of process preferred only */
public static final String PROCESS_PREFERRED_ONLY_DEFAULT = "false";
/** Property for process unassociated */
public static final String PROCESS_UNASSOCIATED_PROPERTY = "processUnassociated";
/** Default state of process unassociated */
public static final String PROCESS_UNASSOCIATED_DEFAULT = "true";
/** Property for process only when event change */
public static final String PROCESS_ONLY_WHEN_EVENT_CHANGE_PROPERTY = "processOnlyWhenEventChanged";
/** Default state of process only when event change */
public static final String PROCESS_ONLY_WHEN_EVENT_CHANGE_DEFAULT = "false";
/** Property for Ignore archive */
public static final String IGNORE_ARCHIVE_PROPERTY = "ignoreArchive";
/** Default state of ignore archive */
public static final String IGNORE_ARCHIVE_DEFAULT = "true";
/** Whether or not to process only preferred products. */
private boolean processOnlyPreferredProducts = false;
/** Whether or not to process unassociated products. */
private boolean processUnassociatedProducts = true;
/**
* Whether or not to process updates that don't change preferred event
* parameters.
*/
private boolean processOnlyWhenEventChanged = false;
/** Whether or not to process archive events. */
private boolean ignoreArchive = false;
@Override
public void onIndexerEvent(IndexerEvent event) throws Exception {
StringBuffer buf = new StringBuffer();
Iterator<IndexerChange> changes = event.getIndexerChanges().iterator();
while (changes.hasNext()) {
IndexerChange change = changes.next();
buf.append("\n").append(change.getType().toString()).append(" ");
if (change.getOriginalEvent() == null) {
buf.append("null");
} else {
buf.append(change.getOriginalEvent().getEventId());
}
buf.append(" => ");
if (change.getNewEvent() == null) {
buf.append("null");
} else {
buf.append(change.getNewEvent().getEventId());
}
}
LOGGER.info(buf.toString());
}
/**
* @param change
* the indexer event that has occurred
* @return whether this external indexer listener handles this product type
* @throws Exception if error occurs
*/
public boolean accept(IndexerEvent change) throws Exception {
String productType = null;
if (change.getSummary() != null) {
ProductId productId = change.getSummary().getId();
productType = productId.getType();
// use default notification listener first
if (!super.accept(productId)) {
return false;
}
}
List<Event> events = change.getEvents();
if (!processUnassociatedProducts && events.size() == 0) {
LOGGER.fine("[" + getName() + "] product is unassociated");
return false;
}
if (processOnlyPreferredProducts && events.size() > 0) {
// check if preferred for any event
boolean isPreferred = false;
// can only be a preferred product if a summary associated
if (productType != null) {
Iterator<Event> iter = events.iterator();
while (iter.hasNext()) {
Event event = iter.next();
ProductSummary preferred = event
.getPreferredProduct(productType);
if (preferred != null && preferred.getId().equals(
change.getSummary().getId())) {
// it is the most preferred product for this event
isPreferred = true;
break;
}
}
}
if (!isPreferred) {
LOGGER.fine("[" + getName()
+ "] product is not preferred in any event");
return false;
}
}
// accept by default
return true;
}
/**
* Returns a boolean based on if the preferred event params have changed
* Returns false if change is an archive indexer
* @param event an IndexerEvent
* @param change and IndexerChange
* @return boolean
* @throws Exception if error occurs
*/
public boolean accept(IndexerEvent event, IndexerChange change)
throws Exception {
// check whether this is an archive indexer change
if (ignoreArchive
&& (change.getType() == IndexerChangeType.PRODUCT_ARCHIVED
|| change.getType() == IndexerChangeType.EVENT_ARCHIVED)) {
return false;
}
// see if preferred event parameters have changed
if (processOnlyWhenEventChanged) {
Event originalEvent = change.getOriginalEvent();
Event newEvent = change.getNewEvent();
if (originalEvent != null && newEvent != null) {
EventSummary originalEventSummary = originalEvent.getEventSummary();
EventSummary newEventSummary = newEvent.getEventSummary();
if (CompareUtil.nullSafeCompare(
originalEventSummary.getMagnitude(),
newEventSummary.getMagnitude()) != 0) {
// magnitude changed
} else if (CompareUtil.nullSafeCompare(
originalEventSummary.getLatitude(),
newEventSummary.getLatitude()) != 0) {
// latitude changed
} else if (CompareUtil.nullSafeCompare(
originalEventSummary.getLongitude(),
newEventSummary.getLongitude()) != 0) {
// longitude changed
} else if (CompareUtil.nullSafeCompare(
originalEventSummary.getDepth(),
newEventSummary.getDepth()) != 0) {
// depth changed
} else if (CompareUtil.nullSafeCompare(
originalEventSummary.getTime(),
newEventSummary.getTime()) != 0) {
// time changed
} else if (originalEventSummary.isDeleted() != newEventSummary.isDeleted()) {
// status changed
} else {
// preferred event parameters haven't changed
return false;
}
}
}
// accept changes by default
return true;
}
public void configure(Config config) throws Exception {
super.configure(config);
processOnlyPreferredProducts = Boolean.valueOf(config
.getProperty(PROCESS_PREFERRED_ONLY_PROPERTY,
PROCESS_PREFERRED_ONLY_DEFAULT));
LOGGER.config("[" + getName() + "] process only preferred products = "
+ processOnlyPreferredProducts);
processUnassociatedProducts = Boolean.valueOf(config.getProperty(
PROCESS_UNASSOCIATED_PROPERTY, PROCESS_UNASSOCIATED_DEFAULT));
LOGGER.config("[" + getName() + "] process unassociated products = "
+ processUnassociatedProducts);
processOnlyWhenEventChanged = Boolean.valueOf(config.getProperty(
PROCESS_ONLY_WHEN_EVENT_CHANGE_PROPERTY,
PROCESS_ONLY_WHEN_EVENT_CHANGE_DEFAULT));
LOGGER.config("[" + getName() + "] process only when event changed = "
+ processOnlyWhenEventChanged);
ignoreArchive = Boolean.valueOf(config.getProperty(
IGNORE_ARCHIVE_PROPERTY, IGNORE_ARCHIVE_DEFAULT));
LOGGER.config("[" + getName() + "] ignore archive changes = "
+ ignoreArchive);
}
/**
* @return whether only preferred products are processed
*/
public boolean getProcessOnlyPreferredProducts() {
return processOnlyPreferredProducts;
}
/**
* @param processOnlyPreferredProducts
* whether to process ony preferred products
*/
public void setProcessOnlyPreferredProducts(
final boolean processOnlyPreferredProducts) {
this.processOnlyPreferredProducts = processOnlyPreferredProducts;
}
/** @param processUnassociatedProducts to set */
public void setProcessUnassociatedProducts(
final boolean processUnassociatedProducts) {
this.processUnassociatedProducts = processUnassociatedProducts;
}
/** @return boolean processUnassociatedProducts */
public boolean getProcessUnassociatedProducts() {
return processUnassociatedProducts;
}
/** @return boolean processOnlyWhenEventChanged */
public boolean isProcessOnlyWhenEventChanged() {
return processOnlyWhenEventChanged;
}
/** @param processOnlyWhenEventChanged to set */
public void setProcessOnlyWhenEventChanged(
boolean processOnlyWhenEventChanged) {
this.processOnlyWhenEventChanged = processOnlyWhenEventChanged;
}
/** @return ignoreArchive */
public boolean isIgnoreArchive() {
return ignoreArchive;
}
/** @param ignoreArchive to set */
public void setIgnoreArchive(boolean ignoreArchive) {
this.ignoreArchive = ignoreArchive;
}
}