- /*
- * ExternalIndexerListener
- */
- package gov.usgs.earthquake.indexer;
- import gov.usgs.earthquake.distribution.CLIProductBuilder;
- import gov.usgs.earthquake.distribution.ConfigurationException;
- import gov.usgs.earthquake.distribution.ExternalNotificationListener;
- import gov.usgs.earthquake.distribution.FileProductStorage;
- import gov.usgs.earthquake.distribution.HeartbeatListener;
- import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
- import gov.usgs.earthquake.distribution.ProductStorage;
- import gov.usgs.earthquake.indexer.IndexerChange.IndexerChangeType;
- import gov.usgs.earthquake.product.Content;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.util.Config;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.XmlUtils;
- import java.io.File;
- import java.io.IOException;
- import java.net.URI;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * ExternalIndexerListener triggers external, non-Java listener processes.
- *
- * Provides a translation to a command-line interface
- * for the product indexer to speak with external, non-Java listeners.
- *
- * As a child-class of the AbstractListener, this also accepts the following
- * configration parameters:
- *
- * <dl>
- * <dt>command</dt>
- * <dd>(Required) The command to execute. This must be an executable command and
- * may include arguments. Any product-specific arguments are appended at the end
- * of command.</dd>
- *
- * <dt>storage</dt>
- * <dd>(Required) A directory used to store all products. Each product is
- * extracted into a separate directory within this directory and is referenced
- * by the --directory=/path/to/directory argument when command is executed.</dd>
- *
- * <dt>processUnassociated</dt>
- * <dd>(Optional, Default = false) Whether or not to process unassociated
- * products. Valid values are "true" and "false".</dd>
- *
- * <dt>processPreferredOnly</dt>
- * <dd>(Optional, Default = false) Whether or not to process only preferred
- * products of the type accepted by this listener. Valid values are "true" and
- * "false".</dd>
- *
- * <dt>autoArchive</dt>
- * <dd>(Optional, Default = false) Whether or not to archive products from
- * storage when they are archived by the indexer.</dd>
- *
- * </dl>
- */
- public class ExternalIndexerListener extends DefaultIndexerListener {
- private static final Logger LOGGER = Logger
- .getLogger(ExternalIndexerListener.class.getName());
- /** Argument for event action */
- public static final String EVENT_ACTION_ARGUMENT = "--action=";
- /** Argument for event ids */
- public static final String EVENT_IDS_ARGUMENT = "--eventids=";
- /** Argument for preferred event id */
- public static final String PREFERRED_ID_ARGUMENT = "--preferred-eventid=";
- /** Argument for preferred eventsource */
- public static final String PREFERRED_EVENTSOURCE_ARGUMENT = "--preferred-eventsource=";
- /** Argument for preferred eventsourcecode */
- public static final String PREFERRED_EVENTSOURCECODE_ARGUMENT = "--preferred-eventsourcecode=";
- /** Argument for preferred magnitude */
- public static final String PREFERRED_MAGNITUDE_ARGUMENT = "--preferred-magnitude=";
- /** Argument for preferred longitude */
- public static final String PREFERRED_LONGITUDE_ARGUMENT = "--preferred-longitude=";
- /** Argument for preferred latitude */
- public static final String PREFERRED_LATITUDE_ARGUMENT = "--preferred-latitude=";
- /** Argument for preferred depth */
- public static final String PREFERRED_DEPTH_ARGUMENT = "--preferred-depth=";
- /** Argument for preferred eventitme */
- public static final String PREFERRED_ORIGIN_TIME_ARGUMENT = "--preferred-eventtime=";
- /** Configuration parameter for storage directory product. */
- public static final String STORAGE_NAME_PROPERTY = "storage";
- /** Short circuit to directly configure storage directory. */
- public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";
- /** Configuration parameter for command. */
- public static final String COMMAND_PROPERTY = "command";
- /** Configuration parameter for autoArchive. */
- public static final String AUTO_ARCHIVE_PROPERTY = "autoArchive";
- /** Default state for auto archive */
- public static final String AUTO_ARCHIVE_DEFAULT = "true";
- /** Argument used to pass signature to external process. */
- public static final String SIGNATURE_ARGUMENT = "--signature=";
- /** Where products are stored in extracted form. */
- private FileProductStorage storage;
- /** Command that is executed after a product is stored. */
- private String command;
- /** Archive products from listener storage when archived by indexer. */
- private boolean autoArchive = false;
- /**
- * Construct a new ExternalIndexerListener object
- *
- * The listener must be configured with a FileProductStorage and a command
- * to function.
- */
- public ExternalIndexerListener() {
- super();
- }
- /*
- * (non-Javadoc)
- *
- * @see gov.usgs.earthquake.indexer.IndexerListener#onIndexerEvent(gov.usgs.
- * earthquake.indexer.IndexerEvent)
- */
- public void onIndexerEvent(IndexerEvent change) throws Exception {
- // Only handle products that are specifically included, unless there are
- // no specified inclusions, and do not handle products that are
- // specifically excluded.
- if (accept(change)) {
- // store product first
- Product product = storeProduct(change.getProduct());
- for (Iterator<IndexerChange> changeIter = change
- .getIndexerChanges().iterator(); changeIter.hasNext();) {
- IndexerChange indexerChange = changeIter.next();
- // check if we should process this change
- if (!accept(change, indexerChange)) {
- continue;
- }
- // build command
- final String indexerCommand = getProductSummaryCommand(change,
- indexerChange);
- runProductCommand(indexerCommand, product);
- }
- }
- if (autoArchive) {
- Iterator<IndexerChange> changeIter = change.getIndexerChanges()
- .iterator();
- ProductStorage storage = getStorage();
- while (changeIter.hasNext()) {
- IndexerChange nextChange = changeIter.next();
- if (nextChange.getType() == IndexerChangeType.PRODUCT_ARCHIVED) {
- // one product being archived
- if (change.getSummary() != null) {
- ProductId productId = change.getSummary().getId();
- LOGGER.log(Level.FINER,
- "[" + getName() + "] auto archiving product "
- + productId.toString());
- storage.removeProduct(productId);
- }
- } else if (nextChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
- // all products on event being archived
- Event changeEvent = nextChange.getOriginalEvent();
- LOGGER.log(Level.FINER,
- "[" + getName() + "] auto archiving event "
- + changeEvent.getEventId() + " products");
- Iterator<ProductSummary> productIter = changeEvent
- .getAllProductList().iterator();
- while (productIter.hasNext()) {
- ProductId productId = productIter.next().getId();
- LOGGER.log(Level.FINER,
- "[" + getName() + "] auto archiving product "
- + productId.toString());
- storage.removeProduct(productId);
- }
- }
- }
- }
- }
- /**
- * Store product associated with the change.
- *
- * @param product product to be stored.
- * @return a new product object, read from the listener storage.
- * @throws Exception if error occurs
- */
- public Product storeProduct(final Product product) throws Exception {
- Product listenerProduct = null;
- try {
- if (product != null) {
- getStorage().storeProduct(product);
- listenerProduct = getStorage().getProduct(product.getId());
- } else {
- LOGGER.finer("[" + getName()
- + "] Change product is null. Probably archiving.");
- }
- } catch (ProductAlreadyInStorageException paise) {
- LOGGER.info("[" + getName() + "] product already in storage");
- // keep going anyways, but load from local storage
- listenerProduct = getStorage().getProduct(product.getId());
- }
- return listenerProduct;
- }
- /**
- * Run a product command.
- *
- * @param command command and arguments.
- * @param product product, when set and empty content (path "") is defined,
- * the content is provided to the command on stdin.
- * @throws Exception if error occurs
- */
- public void runProductCommand(final String command, final Product product) throws Exception {
- // execute
- LOGGER.info("[" + getName() + "] running command " + command);
- final Process process = Runtime.getRuntime().exec(command);
- // Stream content over stdin if it exists
- if (product != null) {
- Content content = product.getContents().get("");
- if (content != null) {
- StreamUtils.transferStream(content.getInputStream(),
- process.getOutputStream());
- }
- }
- // Close the output stream
- StreamUtils.closeStream(process.getOutputStream());
- final Timer commandTimer;
- if (this.getTimeout() > 0) {
- commandTimer = new Timer();
- // Schedule process destruction for commandTimeout
- // milliseconds in the future
- commandTimer.schedule(new TimerTask() {
- public void run() {
- LOGGER.warning("[" + getName()
- + "] command timeout '" + command
- + "', destroying process.");
- process.destroy();
- }
- }, this.getTimeout());
- } else {
- commandTimer = null;
- }
- try {
- // Wait for process to complete
- process.waitFor();
- } finally {
- if (commandTimer != null) {
- // Cancel the timer if it was not triggered
- commandTimer.cancel();
- }
- }
- LOGGER.info("[" + getName() + "] command '" + command
- + "' exited with status '" + process.exitValue() + "'");
- if (process.exitValue() != 0) {
- byte[] errorOutput = StreamUtils.readStream(process.getErrorStream());
- LOGGER.fine("[" + getName() + "] command '" + command + "' stderr output '" +
- new String(errorOutput) + "'");
- }
- StreamUtils.closeStream(process.getErrorStream());
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
- HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
- Integer.toString(process.exitValue()));
- }
- /**
- * Get the product command and add the indexer arguments to it.
- *
- * @param change
- * The IndexerEvent received by the ExternalIndexerListener
- * @param indexerChange
- * The IndexerChange
- * @return the command to execute with its arguments as a string
- * @throws Exception if error occurs
- */
- public String getProductSummaryCommand(IndexerEvent change,
- IndexerChange indexerChange) throws Exception {
- ProductSummary summary = change.getSummary();
- Event event = indexerChange.getNewEvent();
- // When archiving events include event information
- if (event == null && indexerChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
- event = indexerChange.getOriginalEvent();
- }
- String command = getProductSummaryCommand(event, summary);
- // Tells external indexer what type of index event occurred.
- command = command + " " +
- ExternalIndexerListener.EVENT_ACTION_ARGUMENT +
- indexerChange.getType().toString();
- return command;
- }
- /**
- * Get the command for a specific event and summary.
- *
- * @param event Specific event
- * @param summary Specific product summary
- * @return command line arguments as a string.
- * @throws Exception if error occurs
- */
- public String getProductSummaryCommand(Event event, ProductSummary summary) throws Exception {
- StringBuffer indexerCommand = new StringBuffer(getCommand());
- if (event != null) {
- indexerCommand.append(getEventArguments(event));
- }
- if (summary != null) {
- indexerCommand.append(getProductSummaryArguments(summary));
- }
- Product product = null;
- try {
- product = getStorage().getProduct(summary.getId());
- } catch (Exception e) {
- // when archiving product may not exist
- LOGGER.log(
- Level.FINE,
- "Exception retreiving product from storage, probably archiving",
- e);
- }
- if (product != null) {
- // Can only add these arguments if there is a product
- Content content = product.getContents().get("");
- if (content != null) {
- indexerCommand.append(" ").append(
- CLIProductBuilder.CONTENT_ARGUMENT);
- indexerCommand.append(" ")
- .append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
- .append(content.getContentType());
- }
- if (product.getSignature() != null) {
- indexerCommand
- .append(" ")
- .append(ExternalNotificationListener.SIGNATURE_ARGUMENT)
- .append(product.getSignature());
- }
- }
- return indexerCommand.toString();
- }
- /**
- * Get command line arguments for an event.
- *
- * @param event the event
- * @return command line arguments
- */
- public String getEventArguments(final Event event) {
- StringBuffer buf = new StringBuffer();
- EventSummary eventSummary = event.getEventSummary();
- buf.append(" ")
- .append(ExternalIndexerListener.PREFERRED_ID_ARGUMENT)
- .append(eventSummary.getId());
- buf.append(" ")
- .append(ExternalIndexerListener.PREFERRED_EVENTSOURCE_ARGUMENT)
- .append(eventSummary.getSource());
- buf.append(" ")
- .append(ExternalIndexerListener.PREFERRED_EVENTSOURCECODE_ARGUMENT)
- .append(eventSummary.getSourceCode());
- Map<String, List<String>> eventids = event.getAllEventCodes(true);
- Iterator<String> sourceIter = eventids.keySet().iterator();
- buf.append(" ").append(EVENT_IDS_ARGUMENT);
- while (sourceIter.hasNext()) {
- String source = sourceIter.next();
- Iterator<String> sourceCodeIter = eventids.get(source).iterator();
- while (sourceCodeIter.hasNext()) {
- String sourceCode = sourceCodeIter.next();
- buf.append(source).append(sourceCode);
- if (sourceCodeIter.hasNext() || sourceIter.hasNext()) {
- buf.append(",");
- }
- }
- }
- buf.append(" ").append(PREFERRED_MAGNITUDE_ARGUMENT)
- .append(eventSummary.getMagnitude());
- buf.append(" ").append(PREFERRED_LATITUDE_ARGUMENT)
- .append(eventSummary.getLatitude());
- buf.append(" ").append(PREFERRED_LONGITUDE_ARGUMENT)
- .append(eventSummary.getLongitude());
- buf.append(" ").append(PREFERRED_DEPTH_ARGUMENT)
- .append(eventSummary.getDepth());
- String eventTime = null;
- if (event.getTime() != null) {
- eventTime = XmlUtils.formatDate(event.getTime());
- }
- buf.append(" ").append(PREFERRED_ORIGIN_TIME_ARGUMENT)
- .append(eventTime);
- return buf.toString();
- }
- /**
- * Get command line arguments for a product summary.
- *
- * @param summary the product summary
- * @return command line arguments
- * @throws IOException if IO error occurs
- */
- public String getProductSummaryArguments(final ProductSummary summary) throws IOException {
- StringBuffer buf = new StringBuffer();
- File productDirectory = getStorage().getProductFile(summary.getId());
- if (productDirectory.exists()) {
- // Add the directory argument
- buf.append(" ")
- .append(CLIProductBuilder.DIRECTORY_ARGUMENT)
- .append(productDirectory.getCanonicalPath());
- }
- // Add arguments from summary
- buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
- .append(summary.getType());
- buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
- .append(summary.getCode());
- buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
- .append(summary.getSource());
- buf.append(" ")
- .append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
- .append(XmlUtils.formatDate(summary.getUpdateTime()));
- buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
- .append(summary.getStatus());
- if (summary.isDeleted()) {
- buf.append(" ")
- .append(CLIProductBuilder.DELETE_ARGUMENT);
- }
- // Add optional tracker URL argument
- if (summary.getTrackerURL() != null) {
- buf.append(" ")
- .append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
- .append(summary.getTrackerURL());
- }
- // Add property arguments
- Map<String, String> props = summary.getProperties();
- Iterator<String> iter = props.keySet().iterator();
- while (iter.hasNext()) {
- String name = iter.next();
- buf.append(" \"")
- .append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name)
- .append("=").append(props.get(name).replace("\"", "\\\""))
- .append("\"");
- }
- // Add link arguments
- Map<String, List<URI>> links = summary.getLinks();
- iter = links.keySet().iterator();
- while (iter.hasNext()) {
- String relation = iter.next();
- Iterator<URI> iter2 = links.get(relation).iterator();
- while (iter2.hasNext()) {
- buf.append(" ")
- .append(CLIProductBuilder.LINK_ARGUMENT)
- .append(relation).append("=")
- .append(iter2.next().toString());
- }
- }
- return buf.toString();
- }
- /**
- * Configure an ExternalNotificationListener using a Config object.
- *
- * @param config
- * the config containing a
- */
- public void configure(Config config) throws Exception {
- super.configure(config);
- command = config.getProperty(COMMAND_PROPERTY);
- if (command == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'command' is a required configuration property");
- }
- LOGGER.config("[" + getName() + "] command is '" + command + "'");
- // storage references an object in the global configuration
- String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
- String directoryName = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
- if (storageName == null && directoryName == null) {
- throw new ConfigurationException("[" + getName()
- + "] one of 'storage' or 'storageDirectory' is required");
- }
- if (storageName != null) {
- LOGGER.config("[" + getName() + "] loading FileProductStorage '"
- + storageName + "'");
- storage = (FileProductStorage) Config.getConfig().getObject(
- storageName);
- if (storage == null) {
- throw new ConfigurationException("[" + getName()
- + "] unable to load FileProductStorage '" + storageName
- + "'");
- }
- } else {
- LOGGER.config("[" + getName() + "] using storage directory '"
- + directoryName + "'");
- storage = new FileProductStorage(new File(directoryName));
- storage.setName(getName() + "-storage");
- }
- autoArchive = Boolean.valueOf(config.getProperty(AUTO_ARCHIVE_PROPERTY,
- AUTO_ARCHIVE_DEFAULT));
- LOGGER.config("[" + getName() + "] autoArchive = " + autoArchive);
- }
- /**
- * Called when client is shutting down.
- */
- public void shutdown() throws Exception {
- super.shutdown();
- // maybe make current process a member and kill process?
- // or find way of detaching so client process can exit but product
- // process can complete?
- storage.shutdown();
- }
- /**
- * Called after client has been configured and should begin processing.
- */
- public void startup() throws Exception {
- // no background threads to start or objects to create
- storage.startup();
- super.startup();
- }
- /**
- * @return the storage
- */
- public FileProductStorage getStorage() {
- return storage;
- }
- /**
- * @param storage
- * the storage to set
- */
- public void setStorage(FileProductStorage storage) {
- this.storage = storage;
- }
- /**
- * @return the command
- */
- public String getCommand() {
- return command;
- }
- /**
- * @param command
- * the command to set
- */
- public void setCommand(String command) {
- this.command = command;
- }
- /**
- * @return the autoArchive
- */
- public boolean isAutoArchive() {
- return autoArchive;
- }
- /**
- * @param autoArchive
- * the autoArchive to set
- */
- public void setAutoArchive(boolean autoArchive) {
- this.autoArchive = autoArchive;
- }
- }