ExternalIndexerListener.java

  1. /*
  2.  * ExternalIndexerListener
  3.  */
  4. package gov.usgs.earthquake.indexer;

  5. import gov.usgs.earthquake.distribution.CLIProductBuilder;
  6. import gov.usgs.earthquake.distribution.ConfigurationException;
  7. import gov.usgs.earthquake.distribution.ExternalNotificationListener;
  8. import gov.usgs.earthquake.distribution.FileProductStorage;
  9. import gov.usgs.earthquake.distribution.HeartbeatListener;
  10. import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
  11. import gov.usgs.earthquake.distribution.ProductStorage;
  12. import gov.usgs.earthquake.indexer.IndexerChange.IndexerChangeType;
  13. import gov.usgs.earthquake.product.Content;
  14. import gov.usgs.earthquake.product.Product;
  15. import gov.usgs.earthquake.product.ProductId;
  16. import gov.usgs.util.Config;
  17. import gov.usgs.util.StreamUtils;
  18. import gov.usgs.util.XmlUtils;

  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.net.URI;
  22. import java.util.Iterator;
  23. import java.util.List;
  24. import java.util.Map;
  25. import java.util.Timer;
  26. import java.util.TimerTask;
  27. import java.util.logging.Level;
  28. import java.util.logging.Logger;

  29. /**
  30.  * ExternalIndexerListener triggers external, non-Java listener processes.
  31.  *
  32.  * Provides a translation to a command-line interface
  33.  * for the product indexer to speak with external, non-Java listeners.
  34.  *
  35.  * As a child-class of the AbstractListener, this also accepts the following
  36.  * configration parameters:
  37.  *
  38.  * <dl>
  39.  * <dt>command</dt>
  40.  * <dd>(Required) The command to execute. This must be an executable command and
  41.  * may include arguments. Any product-specific arguments are appended at the end
  42.  * of command.</dd>
  43.  *
  44.  * <dt>storage</dt>
  45.  * <dd>(Required) A directory used to store all products. Each product is
  46.  * extracted into a separate directory within this directory and is referenced
  47.  * by the --directory=/path/to/directory argument when command is executed.</dd>
  48.  *
  49.  * <dt>processUnassociated</dt>
  50.  * <dd>(Optional, Default = false) Whether or not to process unassociated
  51.  * products. Valid values are "true" and "false".</dd>
  52.  *
  53.  * <dt>processPreferredOnly</dt>
  54.  * <dd>(Optional, Default = false) Whether or not to process only preferred
  55.  * products of the type accepted by this listener. Valid values are "true" and
  56.  * "false".</dd>
  57.  *
  58.  * <dt>autoArchive</dt>
  59.  * <dd>(Optional, Default = false) Whether or not to archive products from
  60.  * storage when they are archived by the indexer.</dd>
  61.  *
  62.  * </dl>
  63.  */
  64. public class ExternalIndexerListener extends DefaultIndexerListener {

  65.     private static final Logger LOGGER = Logger
  66.             .getLogger(ExternalIndexerListener.class.getName());

  67.     /** Argument for event action */
  68.     public static final String EVENT_ACTION_ARGUMENT = "--action=";
  69.     /** Argument for event ids */
  70.     public static final String EVENT_IDS_ARGUMENT = "--eventids=";

  71.     /** Argument for preferred event id */
  72.     public static final String PREFERRED_ID_ARGUMENT = "--preferred-eventid=";
  73.     /** Argument for preferred eventsource */
  74.     public static final String PREFERRED_EVENTSOURCE_ARGUMENT = "--preferred-eventsource=";
  75.     /** Argument for preferred eventsourcecode */
  76.     public static final String PREFERRED_EVENTSOURCECODE_ARGUMENT = "--preferred-eventsourcecode=";
  77.     /** Argument for preferred magnitude */
  78.     public static final String PREFERRED_MAGNITUDE_ARGUMENT = "--preferred-magnitude=";
  79.     /** Argument for preferred longitude */
  80.     public static final String PREFERRED_LONGITUDE_ARGUMENT = "--preferred-longitude=";
  81.     /** Argument for preferred latitude */
  82.     public static final String PREFERRED_LATITUDE_ARGUMENT = "--preferred-latitude=";
  83.     /** Argument for preferred depth */
  84.     public static final String PREFERRED_DEPTH_ARGUMENT = "--preferred-depth=";
  85.     /** Argument for preferred eventitme */
  86.     public static final String PREFERRED_ORIGIN_TIME_ARGUMENT = "--preferred-eventtime=";
  87.     /** Configuration parameter for storage directory product. */
  88.     public static final String STORAGE_NAME_PROPERTY = "storage";

  89.     /** Short circuit to directly configure storage directory. */
  90.     public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

  91.     /** Configuration parameter for command. */
  92.     public static final String COMMAND_PROPERTY = "command";

  93.     /** Configuration parameter for autoArchive. */
  94.     public static final String AUTO_ARCHIVE_PROPERTY = "autoArchive";
  95.     /** Default state for auto archive */
  96.     public static final String AUTO_ARCHIVE_DEFAULT = "true";

  97.     /** Argument used to pass signature to external process. */
  98.     public static final String SIGNATURE_ARGUMENT = "--signature=";

  99.     /** Where products are stored in extracted form. */
  100.     private FileProductStorage storage;

  101.     /** Command that is executed after a product is stored. */
  102.     private String command;

  103.     /** Archive products from listener storage when archived by indexer. */
  104.     private boolean autoArchive = false;

  105.     /**
  106.      * Construct a new ExternalIndexerListener object
  107.      *
  108.      * The listener must be configured with a FileProductStorage and a command
  109.      * to function.
  110.      */
  111.     public ExternalIndexerListener() {
  112.         super();
  113.     }

  114.     /*
  115.      * (non-Javadoc)
  116.      *
  117.      * @see gov.usgs.earthquake.indexer.IndexerListener#onIndexerEvent(gov.usgs.
  118.      * earthquake.indexer.IndexerEvent)
  119.      */
  120.     public void onIndexerEvent(IndexerEvent change) throws Exception {
  121.         // Only handle products that are specifically included, unless there are
  122.         // no specified inclusions, and do not handle products that are
  123.         // specifically excluded.
  124.         if (accept(change)) {
  125.             // store product first
  126.             Product product = storeProduct(change.getProduct());

  127.             for (Iterator<IndexerChange> changeIter = change
  128.                     .getIndexerChanges().iterator(); changeIter.hasNext();) {
  129.                 IndexerChange indexerChange = changeIter.next();

  130.                 // check if we should process this change
  131.                 if (!accept(change, indexerChange)) {
  132.                     continue;
  133.                 }

  134.                 // build command
  135.                 final String indexerCommand = getProductSummaryCommand(change,
  136.                         indexerChange);

  137.                 runProductCommand(indexerCommand, product);
  138.             }
  139.         }

  140.         if (autoArchive) {
  141.             Iterator<IndexerChange> changeIter = change.getIndexerChanges()
  142.                     .iterator();
  143.             ProductStorage storage = getStorage();
  144.             while (changeIter.hasNext()) {
  145.                 IndexerChange nextChange = changeIter.next();
  146.                 if (nextChange.getType() == IndexerChangeType.PRODUCT_ARCHIVED) {
  147.                     // one product being archived
  148.                     if (change.getSummary() != null) {
  149.                         ProductId productId = change.getSummary().getId();
  150.                         LOGGER.log(Level.FINER,
  151.                                 "[" + getName() + "] auto archiving product "
  152.                                         + productId.toString());
  153.                         storage.removeProduct(productId);
  154.                     }
  155.                 } else if (nextChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
  156.                     // all products on event being archived
  157.                     Event changeEvent = nextChange.getOriginalEvent();
  158.                     LOGGER.log(Level.FINER,
  159.                             "[" + getName() + "] auto archiving event "
  160.                                     + changeEvent.getEventId() + " products");
  161.                     Iterator<ProductSummary> productIter = changeEvent
  162.                             .getAllProductList().iterator();
  163.                     while (productIter.hasNext()) {
  164.                         ProductId productId = productIter.next().getId();
  165.                         LOGGER.log(Level.FINER,
  166.                                 "[" + getName() + "] auto archiving product "
  167.                                         + productId.toString());
  168.                         storage.removeProduct(productId);
  169.                     }
  170.                 }
  171.             }
  172.         }
  173.     }

  174.     /**
  175.      * Store product associated with the change.
  176.      *
  177.      * @param product product to be stored.
  178.      * @return a new product object, read from the listener storage.
  179.      * @throws Exception if error occurs
  180.      */
  181.     public Product storeProduct(final Product product) throws Exception {
  182.         Product listenerProduct = null;
  183.         try {
  184.             if (product != null) {
  185.                 getStorage().storeProduct(product);
  186.                 listenerProduct = getStorage().getProduct(product.getId());
  187.             } else {
  188.                 LOGGER.finer("[" + getName()
  189.                         + "] Change product is null. Probably archiving.");
  190.             }
  191.         } catch (ProductAlreadyInStorageException paise) {
  192.             LOGGER.info("[" + getName() + "] product already in storage");
  193.             // keep going anyways, but load from local storage
  194.             listenerProduct = getStorage().getProduct(product.getId());
  195.         }

  196.         return listenerProduct;
  197.     }

  198.     /**
  199.      * Run a product command.
  200.      *
  201.      * @param command command and arguments.
  202.      * @param product product, when set and empty content (path "") is defined,
  203.      *        the content is provided to the command on stdin.
  204.      * @throws Exception if error occurs
  205.      */
  206.     public void runProductCommand(final String command, final Product product) throws Exception {
  207.         // execute
  208.         LOGGER.info("[" + getName() + "] running command " + command);
  209.         final Process process = Runtime.getRuntime().exec(command);

  210.         // Stream content over stdin if it exists
  211.         if (product != null) {
  212.             Content content = product.getContents().get("");
  213.             if (content != null) {
  214.                 StreamUtils.transferStream(content.getInputStream(),
  215.                         process.getOutputStream());
  216.             }
  217.         }

  218.         // Close the output stream
  219.         StreamUtils.closeStream(process.getOutputStream());

  220.         final Timer commandTimer;
  221.         if (this.getTimeout() > 0) {
  222.              commandTimer = new Timer();
  223.             // Schedule process destruction for commandTimeout
  224.             // milliseconds in the future
  225.             commandTimer.schedule(new TimerTask() {
  226.                 public void run() {
  227.                     LOGGER.warning("[" + getName()
  228.                             + "] command timeout '" + command
  229.                             + "', destroying process.");
  230.                     process.destroy();
  231.                 }
  232.             }, this.getTimeout());
  233.         } else {
  234.             commandTimer = null;
  235.         }

  236.         try {
  237.             // Wait for process to complete
  238.             process.waitFor();
  239.         } finally {
  240.             if (commandTimer != null) {
  241.                 // Cancel the timer if it was not triggered
  242.                 commandTimer.cancel();
  243.             }
  244.         }
  245.         LOGGER.info("[" + getName() + "] command '" + command
  246.                 + "' exited with status '" + process.exitValue() + "'");
  247.         if (process.exitValue() != 0) {
  248.             byte[] errorOutput = StreamUtils.readStream(process.getErrorStream());
  249.             LOGGER.fine("[" + getName() + "] command '" + command + "' stderr output '" +
  250.                     new String(errorOutput) + "'");
  251.         }
  252.         StreamUtils.closeStream(process.getErrorStream());

  253.         // send heartbeat info
  254.         HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
  255.         HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
  256.                 Integer.toString(process.exitValue()));
  257.     }

  258.     /**
  259.      * Get the product command and add the indexer arguments to it.
  260.      *
  261.      * @param change
  262.      *            The IndexerEvent received by the ExternalIndexerListener
  263.      * @param indexerChange
  264.      *            The IndexerChange
  265.      * @return the command to execute with its arguments as a string
  266.      * @throws Exception if error occurs
  267.      */
  268.     public String getProductSummaryCommand(IndexerEvent change,
  269.             IndexerChange indexerChange) throws Exception {
  270.         ProductSummary summary = change.getSummary();

  271.         Event event = indexerChange.getNewEvent();
  272.         // When archiving events include event information
  273.         if (event == null && indexerChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
  274.             event = indexerChange.getOriginalEvent();
  275.         }

  276.         String command = getProductSummaryCommand(event, summary);

  277.         // Tells external indexer what type of index event occurred.
  278.         command = command + " " +
  279.                 ExternalIndexerListener.EVENT_ACTION_ARGUMENT +
  280.                 indexerChange.getType().toString();

  281.         return command;
  282.     }

  283.     /**
  284.      * Get the command for a specific event and summary.
  285.      *
  286.      * @param event Specific event
  287.      * @param summary Specific product summary
  288.      * @return command line arguments as a string.
  289.      * @throws Exception if error occurs
  290.      */
  291.     public String getProductSummaryCommand(Event event, ProductSummary summary) throws Exception {
  292.         StringBuffer indexerCommand = new StringBuffer(getCommand());

  293.         if (event != null) {
  294.             indexerCommand.append(getEventArguments(event));
  295.         }
  296.         if (summary != null) {
  297.             indexerCommand.append(getProductSummaryArguments(summary));
  298.         }


  299.         Product product = null;
  300.         try {
  301.             product = getStorage().getProduct(summary.getId());
  302.         } catch (Exception e) {
  303.             // when archiving product may not exist
  304.             LOGGER.log(
  305.                     Level.FINE,
  306.                     "Exception retreiving product from storage, probably archiving",
  307.                     e);
  308.         }
  309.         if (product != null) {
  310.             // Can only add these arguments if there is a product
  311.             Content content = product.getContents().get("");
  312.             if (content != null) {
  313.                 indexerCommand.append(" ").append(
  314.                         CLIProductBuilder.CONTENT_ARGUMENT);
  315.                 indexerCommand.append(" ")
  316.                         .append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
  317.                         .append(content.getContentType());
  318.             }

  319.             if (product.getSignature() != null) {
  320.                 indexerCommand
  321.                         .append(" ")
  322.                         .append(ExternalNotificationListener.SIGNATURE_ARGUMENT)
  323.                         .append(product.getSignature());
  324.             }

  325.         }

  326.         return indexerCommand.toString();
  327.     }

  328.     /**
  329.      * Get command line arguments for an event.
  330.      *
  331.      * @param event the event
  332.      * @return command line arguments
  333.      */
  334.     public String getEventArguments(final Event event) {
  335.         StringBuffer buf = new StringBuffer();

  336.         EventSummary eventSummary = event.getEventSummary();
  337.         buf.append(" ")
  338.                 .append(ExternalIndexerListener.PREFERRED_ID_ARGUMENT)
  339.                 .append(eventSummary.getId());
  340.         buf.append(" ")
  341.                 .append(ExternalIndexerListener.PREFERRED_EVENTSOURCE_ARGUMENT)
  342.                 .append(eventSummary.getSource());
  343.         buf.append(" ")
  344.                 .append(ExternalIndexerListener.PREFERRED_EVENTSOURCECODE_ARGUMENT)
  345.                 .append(eventSummary.getSourceCode());
  346.         Map<String, List<String>> eventids = event.getAllEventCodes(true);
  347.         Iterator<String> sourceIter = eventids.keySet().iterator();
  348.         buf.append(" ").append(EVENT_IDS_ARGUMENT);
  349.         while (sourceIter.hasNext()) {
  350.             String source = sourceIter.next();
  351.             Iterator<String> sourceCodeIter = eventids.get(source).iterator();
  352.             while (sourceCodeIter.hasNext()) {
  353.                 String sourceCode = sourceCodeIter.next();
  354.                 buf.append(source).append(sourceCode);
  355.                 if (sourceCodeIter.hasNext() || sourceIter.hasNext()) {
  356.                     buf.append(",");
  357.                 }
  358.             }
  359.         }

  360.         buf.append(" ").append(PREFERRED_MAGNITUDE_ARGUMENT)
  361.                 .append(eventSummary.getMagnitude());
  362.         buf.append(" ").append(PREFERRED_LATITUDE_ARGUMENT)
  363.                 .append(eventSummary.getLatitude());
  364.         buf.append(" ").append(PREFERRED_LONGITUDE_ARGUMENT)
  365.                 .append(eventSummary.getLongitude());
  366.         buf.append(" ").append(PREFERRED_DEPTH_ARGUMENT)
  367.                 .append(eventSummary.getDepth());
  368.         String eventTime = null;
  369.         if (event.getTime() != null) {
  370.             eventTime = XmlUtils.formatDate(event.getTime());
  371.         }
  372.         buf.append(" ").append(PREFERRED_ORIGIN_TIME_ARGUMENT)
  373.                 .append(eventTime);

  374.         return buf.toString();
  375.     }

  376.     /**
  377.      * Get command line arguments for a product summary.
  378.      *
  379.      * @param summary the product summary
  380.      * @return command line arguments
  381.      * @throws IOException if IO error occurs
  382.      */
  383.     public String getProductSummaryArguments(final ProductSummary summary) throws IOException {
  384.         StringBuffer buf = new StringBuffer();

  385.         File productDirectory = getStorage().getProductFile(summary.getId());
  386.         if (productDirectory.exists()) {
  387.             // Add the directory argument
  388.             buf.append(" ")
  389.                     .append(CLIProductBuilder.DIRECTORY_ARGUMENT)
  390.                     .append(productDirectory.getCanonicalPath());
  391.         }

  392.         // Add arguments from summary
  393.         buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
  394.                 .append(summary.getType());
  395.         buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
  396.                 .append(summary.getCode());
  397.         buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
  398.                 .append(summary.getSource());
  399.         buf.append(" ")
  400.                 .append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
  401.                 .append(XmlUtils.formatDate(summary.getUpdateTime()));
  402.         buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
  403.                 .append(summary.getStatus());
  404.         if (summary.isDeleted()) {
  405.             buf.append(" ")
  406.                     .append(CLIProductBuilder.DELETE_ARGUMENT);
  407.         }

  408.         // Add optional tracker URL argument
  409.         if (summary.getTrackerURL() != null) {
  410.             buf.append(" ")
  411.                     .append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
  412.                     .append(summary.getTrackerURL());
  413.         }

  414.         // Add property arguments
  415.         Map<String, String> props = summary.getProperties();
  416.         Iterator<String> iter = props.keySet().iterator();
  417.         while (iter.hasNext()) {
  418.             String name = iter.next();
  419.             buf.append(" \"")
  420.                     .append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name)
  421.                     .append("=").append(props.get(name).replace("\"", "\\\""))
  422.                     .append("\"");
  423.         }

  424.         // Add link arguments
  425.         Map<String, List<URI>> links = summary.getLinks();
  426.         iter = links.keySet().iterator();
  427.         while (iter.hasNext()) {
  428.             String relation = iter.next();
  429.             Iterator<URI> iter2 = links.get(relation).iterator();
  430.             while (iter2.hasNext()) {
  431.                 buf.append(" ")
  432.                         .append(CLIProductBuilder.LINK_ARGUMENT)
  433.                         .append(relation).append("=")
  434.                         .append(iter2.next().toString());
  435.             }
  436.         }

  437.         return buf.toString();
  438.     }

  439.     /**
  440.      * Configure an ExternalNotificationListener using a Config object.
  441.      *
  442.      * @param config
  443.      *            the config containing a
  444.      */
  445.     public void configure(Config config) throws Exception {
  446.         super.configure(config);

  447.         command = config.getProperty(COMMAND_PROPERTY);
  448.         if (command == null) {
  449.             throw new ConfigurationException("[" + getName()
  450.                     + "] 'command' is a required configuration property");
  451.         }
  452.         LOGGER.config("[" + getName() + "] command is '" + command + "'");

  453.         // storage references an object in the global configuration
  454.         String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
  455.         String directoryName = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
  456.         if (storageName == null && directoryName == null) {
  457.             throw new ConfigurationException("[" + getName()
  458.                     + "] one of 'storage' or 'storageDirectory' is required");
  459.         }

  460.         if (storageName != null) {
  461.             LOGGER.config("[" + getName() + "] loading FileProductStorage '"
  462.                     + storageName + "'");
  463.             storage = (FileProductStorage) Config.getConfig().getObject(
  464.                     storageName);
  465.             if (storage == null) {
  466.                 throw new ConfigurationException("[" + getName()
  467.                         + "] unable to load FileProductStorage '" + storageName
  468.                         + "'");
  469.             }
  470.         } else {
  471.             LOGGER.config("[" + getName() + "] using storage directory '"
  472.                     + directoryName + "'");
  473.             storage = new FileProductStorage(new File(directoryName));
  474.             storage.setName(getName() + "-storage");
  475.         }

  476.         autoArchive = Boolean.valueOf(config.getProperty(AUTO_ARCHIVE_PROPERTY,
  477.                 AUTO_ARCHIVE_DEFAULT));
  478.         LOGGER.config("[" + getName() + "] autoArchive = " + autoArchive);
  479.     }

  480.     /**
  481.      * Called when client is shutting down.
  482.      */
  483.     public void shutdown() throws Exception {
  484.         super.shutdown();
  485.         // maybe make current process a member and kill process?
  486.         // or find way of detaching so client process can exit but product
  487.         // process can complete?
  488.         storage.shutdown();
  489.     }

  490.     /**
  491.      * Called after client has been configured and should begin processing.
  492.      */
  493.     public void startup() throws Exception {
  494.         // no background threads to start or objects to create
  495.         storage.startup();
  496.         super.startup();
  497.     }

  498.     /**
  499.      * @return the storage
  500.      */
  501.     public FileProductStorage getStorage() {
  502.         return storage;
  503.     }

  504.     /**
  505.      * @param storage
  506.      *            the storage to set
  507.      */
  508.     public void setStorage(FileProductStorage storage) {
  509.         this.storage = storage;
  510.     }

  511.     /**
  512.      * @return the command
  513.      */
  514.     public String getCommand() {
  515.         return command;
  516.     }

  517.     /**
  518.      * @param command
  519.      *            the command to set
  520.      */
  521.     public void setCommand(String command) {
  522.         this.command = command;
  523.     }

  524.     /**
  525.      * @return the autoArchive
  526.      */
  527.     public boolean isAutoArchive() {
  528.         return autoArchive;
  529.     }

  530.     /**
  531.      * @param autoArchive
  532.      *            the autoArchive to set
  533.      */
  534.     public void setAutoArchive(boolean autoArchive) {
  535.         this.autoArchive = autoArchive;
  536.     }

  537. }