ExternalNotificationListener.java

  1. /*
  2.  * ExternalNotificationListener
  3.  */
  4. package gov.usgs.earthquake.distribution;

  5. import gov.usgs.earthquake.product.Content;
  6. import gov.usgs.earthquake.product.Product;
  7. import gov.usgs.earthquake.product.ProductId;
  8. import gov.usgs.util.Config;
  9. import gov.usgs.util.StreamUtils;
  10. import gov.usgs.util.XmlUtils;

  11. import java.io.File;

  12. import java.net.URI;

  13. import java.util.Iterator;
  14. import java.util.LinkedList;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.StringTokenizer;
  18. import java.util.logging.Level;
  19. import java.util.logging.Logger;

  20. /**
  21.  * An external process that is called when new products arrive.
  22.  *
  23.  * The ExternalNotificationListener implements the Configurable interface and
  24.  * can use the following configuration parameters:
  25.  *
  26.  * <dl>
  27.  * <dt>command</dt>
  28.  * <dd>(Required) The command to execute. This must be an executable command and
  29.  * may include arguments. Any product specific arguments are appended at the end
  30.  * of command.</dd>
  31.  *
  32.  * <dt>storage</dt>
  33.  * <dd>(Required) A directory used to store all products. Each product is
  34.  * extracted into a separate directory within this directory and is referenced
  35.  * by the --directory=/path/to/directory argument when command is executed.</dd>
  36.  * </dl>
  37.  *
  38.  */
  39. public class ExternalNotificationListener extends DefaultNotificationListener {

  40.     /** Logging object. */
  41.     private static final Logger LOGGER = Logger
  42.             .getLogger(ExternalNotificationListener.class.getName());

  43.     /** Configuration parameter for storage directory product. */
  44.     public static final String STORAGE_NAME_PROPERTY = "storage";

  45.     /** Configuration parameter for command. */
  46.     public static final String COMMAND_PROPERTY = "command";

  47.     /** Argument used to pass signature to external process. */
  48.     public static final String SIGNATURE_ARGUMENT = "--signature=";

  49.     private static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

  50.     /** Where products are stored in extracted form. */
  51.     private FileProductStorage storage;

  52.     /** Command that is executed after a product is stored. */
  53.     private String command;

  54.     /**
  55.      * Construct a new ExternalNotificationListener.
  56.      *
  57.      * The listener must be configured with a FileProductStorage and command to
  58.      * function.
  59.      */
  60.     public ExternalNotificationListener() {
  61.     }

  62.     /**
  63.      * Configure an ExternalNotificationListener using a Config object.
  64.      *
  65.      * @param config
  66.      *            the config containing a
  67.      */
  68.     public void configure(Config config) throws Exception {
  69.         super.configure(config);

  70.         command = config.getProperty(COMMAND_PROPERTY);
  71.         if (command == null) {
  72.             throw new ConfigurationException("[" + getName()
  73.                     + "] 'command' is a required configuration property");
  74.         }
  75.         LOGGER.config("[" + getName() + "] command is '" + command + "'");

  76.         // storage references an object in the global configuration
  77.         String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
  78.         String storageDirectory = config
  79.                 .getProperty(STORAGE_DIRECTORY_PROPERTY);
  80.         if (storageName == null && storageDirectory == null) {
  81.             throw new ConfigurationException("[" + getName()
  82.                     + "] 'storage' is a required configuration property.");
  83.         }
  84.         if (storageName != null) {
  85.             LOGGER.config("[" + getName() + "] loading FileProductStorage '"
  86.                     + storageName + "'");
  87.             storage = (FileProductStorage) Config.getConfig().getObject(
  88.                     storageName);
  89.             if (storage == null) {
  90.                 throw new ConfigurationException("[" + getName()
  91.                         + "] unable to load FileProductStorage '" + storageName
  92.                         + "'");
  93.             }
  94.         } else {
  95.             LOGGER.config("[" + getName() + "] using storage directory '"
  96.                     + storageDirectory + "'");
  97.             storage = new FileProductStorage(new File(storageDirectory));
  98.         }
  99.     }

  100.     /**
  101.      * Called when client is shutting down.
  102.      */
  103.     public void shutdown() throws Exception {
  104.         super.shutdown();
  105.         // maybe make current process a member and kill process?
  106.         // or find way of detaching so client process can exit but product
  107.         // process can complete?
  108.         storage.shutdown();
  109.     }

  110.     /**
  111.      * Called after client has been configured and should begin processing.
  112.      */
  113.     public void startup() throws Exception {
  114.         // no background threads to start or objects to create
  115.         storage.startup();
  116.         super.startup();
  117.     }

  118.     /**
  119.      * Append product arguments to the base command.
  120.      *
  121.      * @param product
  122.      *            the product used to generate arguments.
  123.      * @return command as a string.
  124.      * @throws Exception if error occurs
  125.      */
  126.     public String getProductCommand(final Product product) throws Exception {
  127.         StringBuffer buf = new StringBuffer(command);

  128.         ProductId id = product.getId();

  129.         // get path to product in storage, should be a directory
  130.         File productDirectory = storage.getProductFile(id);

  131.         buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT)
  132.                 .append(productDirectory.getCanonicalPath());

  133.         buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
  134.                 .append(id.getType());
  135.         buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
  136.                 .append(id.getCode());
  137.         buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
  138.                 .append(id.getSource());
  139.         buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
  140.                 .append(XmlUtils.formatDate(id.getUpdateTime()));
  141.         buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
  142.                 .append(product.getStatus());
  143.         if (product.isDeleted()) {
  144.             buf.append(" ").append(CLIProductBuilder.DELETE_ARGUMENT);
  145.         }

  146.         if (product.getTrackerURL() != null) {
  147.             buf.append(" ").append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
  148.                     .append(product.getTrackerURL().toString());
  149.         }

  150.         Map<String, String> props = product.getProperties();
  151.         Iterator<String> iter = props.keySet().iterator();
  152.         while (iter.hasNext()) {
  153.             String name = iter.next();
  154.             buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT)
  155.                     .append(name).append("=")
  156.                     .append(props.get(name).replace("\"", "\\\"")).append("\"");
  157.         }

  158.         Map<String, List<URI>> links = product.getLinks();
  159.         iter = links.keySet().iterator();
  160.         while (iter.hasNext()) {
  161.             String relation = iter.next();
  162.             Iterator<URI> iter2 = links.get(relation).iterator();
  163.             while (iter2.hasNext()) {
  164.                 buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT)
  165.                         .append(relation).append("=")
  166.                         .append(iter2.next().toString());
  167.             }
  168.         }

  169.         Content content = product.getContents().get("");
  170.         if (content != null) {
  171.             buf.append(" ").append(CLIProductBuilder.CONTENT_ARGUMENT);
  172.             buf.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
  173.                     .append(content.getContentType());
  174.         }

  175.         if (product.getSignature() != null) {
  176.             buf.append(" ").append(SIGNATURE_ARGUMENT)
  177.                     .append(product.getSignature());
  178.         }

  179.         return buf.toString();
  180.     }

  181.     /**
  182.      * Split a command string into a command array.
  183.      *
  184.      * This version uses a StringTokenizer to split arguments. Quoted arguments
  185.      * are supported (single or double), with quotes removed before passing to
  186.      * runtime. Double quoting arguments will preserve quotes when passing to
  187.      * runtime.
  188.      *
  189.      * @param command
  190.      *            command to run.
  191.      * @return Array of arguments suitable for passing to
  192.      *         Runtime.exec(String[]).
  193.      */
  194.     protected static String[] splitCommand(final String command) {
  195.         List<String> arguments = new LinkedList<String>();
  196.         String currentArgument = null;

  197.         // use a tokenizer because that's how Runtime.exec does it currently...
  198.         StringTokenizer tokens = new StringTokenizer(command);
  199.         while (tokens.hasMoreTokens()) {
  200.             String token = tokens.nextToken();

  201.             if (currentArgument == null) {
  202.                 currentArgument = token;
  203.             } else {
  204.                 // continuing previous argument, that was split on whitespace
  205.                 currentArgument = currentArgument + " " + token;
  206.             }

  207.             if (currentArgument.startsWith("\"")) {
  208.                 // double quoted argument
  209.                 if (currentArgument.endsWith("\"")) {
  210.                     // that has balanced quotes
  211.                     // remove quotes and add argument
  212.                     currentArgument = currentArgument.substring(1,
  213.                             currentArgument.length() - 1);
  214.                 } else {
  215.                     // unbalanced quotes, keep going
  216.                     continue;
  217.                 }
  218.             } else if (currentArgument.startsWith("'")) {
  219.                 // single quoted argument
  220.                 if (currentArgument.endsWith("'")) {
  221.                     // that has balanced quotes
  222.                     // remove quotes and add argument
  223.                     currentArgument = currentArgument.substring(1,
  224.                             currentArgument.length() - 1);
  225.                 } else {
  226.                     // unbalanced quotes, keep going
  227.                     continue;
  228.                 }
  229.             }

  230.             arguments.add(currentArgument);
  231.             currentArgument = null;
  232.         }

  233.         if (currentArgument != null) {
  234.             // weird, but add argument anyways
  235.             arguments.add(currentArgument);
  236.         }

  237.         return arguments.toArray(new String[] {});
  238.     }

  239.     /**
  240.      * Call the external process for this product.
  241.      *
  242.      * @param product Product
  243.      * @throws Exception if error occurs
  244.      */
  245.     public void onProduct(final Product product) throws Exception {
  246.         // store product
  247.         try {
  248.             storage.storeProduct(product);
  249.         } catch (ProductAlreadyInStorageException e) {
  250.             LOGGER.info("[" + getName() + "] product already in storage "
  251.                     + product.getId().toString());
  252.         }

  253.         // now run command
  254.         String productCommand = null;
  255.         Process process = null;
  256.         int exitValue = -1;

  257.         try {
  258.             productCommand = getProductCommand(product);
  259.             LOGGER.info("[" + getName() + "] running command " + productCommand);
  260.             process = Runtime.getRuntime().exec(productCommand);

  261.             // inline product content, may or may not be null
  262.             Content content = product.getContents().get("");
  263.             if (content != null) {
  264.                 StreamUtils.transferStream(content.getInputStream(),
  265.                         process.getOutputStream());
  266.             } else {
  267.                 // need to close process stdin either way
  268.                 StreamUtils.closeStream(process.getOutputStream());
  269.             }

  270.             // maybe log/capture process input/error streams
  271.             // or switch to "Command"

  272.             exitValue = process.waitFor();
  273.         } catch (Exception e) {
  274.             if (process != null) {
  275.                 // make sure to kill zombies
  276.                 process.destroy();
  277.             }

  278.             // signal that process did not exit normally
  279.             exitValue = -1;

  280.             // give subclasses chance to handle exception
  281.             commandException(product, productCommand, e);
  282.         }

  283.         // if process exited normally
  284.         if (exitValue != -1) {
  285.             // give subclasses chance to handle exitValue, which may be non-zero
  286.             commandComplete(product, productCommand, exitValue);
  287.         }
  288.     }

  289.     /**
  290.      * Called when the command finishes executing normally.
  291.      *
  292.      * This implementation throws a NotificationListenerException if the
  293.      * exitValue is non-zero.
  294.      *
  295.      * @param product
  296.      *            the product being processed.
  297.      * @param command
  298.      *            the generated command, as a string.
  299.      * @param exitValue
  300.      *            the exit status of the process.
  301.      * @throws Exception
  302.      *             When re-notification should occur, based on maxTries, or none
  303.      *             if done.
  304.      */
  305.     public void commandComplete(final Product product, final String command,
  306.             final int exitValue) throws Exception {
  307.         LOGGER.info("[" + getName() + "] command '" + command
  308.                 + "' exited with status '" + exitValue + "'");

  309.         // send heartbeat info
  310.         HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
  311.         HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
  312.                 Integer.toString(exitValue));

  313.         if (exitValue != 0) {
  314.             throw new NotificationListenerException("[" + getName()
  315.                     + "] command exited with status " + exitValue);
  316.         }
  317.     }

  318.     /**
  319.      * Called when an exception occurs while running command.
  320.      *
  321.      * This implementation throws a NotificationListenerException with exception
  322.      * as the cause.
  323.      *
  324.      * @param product
  325.      *            product being processed
  326.      * @param productCommand
  327.      *            command that was built
  328.      * @param exception
  329.      *            exception that was thrown during execution. This will be an
  330.      *            InterruptedException if the process timed out.
  331.      * @throws Exception
  332.      *             When re-notification should occur, based on maxTries, or none
  333.      *             if done.
  334.      */
  335.     public void commandException(final Product product,
  336.             final String productCommand, final Exception exception)
  337.             throws Exception {
  338.         if (exception instanceof InterruptedException) {
  339.             LOGGER.warning("[" + getName() + "] command '" + productCommand
  340.                     + "' timed out");
  341.         } else {
  342.             LOGGER.log(Level.WARNING, "[" + getName()
  343.                     + "] exception running command '" + productCommand + "'",
  344.                     exception);
  345.         }

  346.         // send heartbeat info
  347.         HeartbeatListener.sendHeartbeatMessage(getName(), "exception",
  348.                 productCommand);
  349.         HeartbeatListener.sendHeartbeatMessage(getName(), "exception class",
  350.                 exception.getClass().getName());

  351.         throw new NotificationListenerException(exception);
  352.     }

  353.     /**
  354.      * @return the storage
  355.      */
  356.     public FileProductStorage getStorage() {
  357.         return storage;
  358.     }

  359.     /**
  360.      * @param storage
  361.      *            the storage to set
  362.      */
  363.     public void setStorage(FileProductStorage storage) {
  364.         this.storage = storage;
  365.     }

  366.     /**
  367.      * @return the command
  368.      */
  369.     public String getCommand() {
  370.         return command;
  371.     }

  372.     /**
  373.      * @param command
  374.      *            the command to set
  375.      */
  376.     public void setCommand(String command) {
  377.         this.command = command;
  378.     }

  379. }