ExternalNotificationListener.java
- /*
- * ExternalNotificationListener
- */
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.Content;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.util.Config;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.XmlUtils;
- import java.io.File;
- import java.net.URI;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.StringTokenizer;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * An external process that is called when new products arrive.
- *
- * The ExternalNotificationListener implements the Configurable interface and
- * can use the following configuration parameters:
- *
- * <dl>
- * <dt>command</dt>
- * <dd>(Required) The command to execute. This must be an executable command and
- * may include arguments. Any product specific arguments are appended at the end
- * of command.</dd>
- *
- * <dt>storage</dt>
- * <dd>(Required) A directory used to store all products. Each product is
- * extracted into a separate directory within this directory and is referenced
- * by the --directory=/path/to/directory argument when command is executed.</dd>
- * </dl>
- *
- */
- public class ExternalNotificationListener extends DefaultNotificationListener {
- /** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(ExternalNotificationListener.class.getName());
- /** Configuration parameter for storage directory product. */
- public static final String STORAGE_NAME_PROPERTY = "storage";
- /** Configuration parameter for command. */
- public static final String COMMAND_PROPERTY = "command";
- /** Argument used to pass signature to external process. */
- public static final String SIGNATURE_ARGUMENT = "--signature=";
- private static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";
- /** Where products are stored in extracted form. */
- private FileProductStorage storage;
- /** Command that is executed after a product is stored. */
- private String command;
- /**
- * Construct a new ExternalNotificationListener.
- *
- * The listener must be configured with a FileProductStorage and command to
- * function.
- */
- public ExternalNotificationListener() {
- }
- /**
- * Configure an ExternalNotificationListener using a Config object.
- *
- * @param config
- * the config containing a
- */
- public void configure(Config config) throws Exception {
- super.configure(config);
- command = config.getProperty(COMMAND_PROPERTY);
- if (command == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'command' is a required configuration property");
- }
- LOGGER.config("[" + getName() + "] command is '" + command + "'");
- // storage references an object in the global configuration
- String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
- String storageDirectory = config
- .getProperty(STORAGE_DIRECTORY_PROPERTY);
- if (storageName == null && storageDirectory == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'storage' is a required configuration property.");
- }
- if (storageName != null) {
- LOGGER.config("[" + getName() + "] loading FileProductStorage '"
- + storageName + "'");
- storage = (FileProductStorage) Config.getConfig().getObject(
- storageName);
- if (storage == null) {
- throw new ConfigurationException("[" + getName()
- + "] unable to load FileProductStorage '" + storageName
- + "'");
- }
- } else {
- LOGGER.config("[" + getName() + "] using storage directory '"
- + storageDirectory + "'");
- storage = new FileProductStorage(new File(storageDirectory));
- }
- }
- /**
- * Called when client is shutting down.
- */
- public void shutdown() throws Exception {
- super.shutdown();
- // maybe make current process a member and kill process?
- // or find way of detaching so client process can exit but product
- // process can complete?
- storage.shutdown();
- }
- /**
- * Called after client has been configured and should begin processing.
- */
- public void startup() throws Exception {
- // no background threads to start or objects to create
- storage.startup();
- super.startup();
- }
- /**
- * Append product arguments to the base command.
- *
- * @param product
- * the product used to generate arguments.
- * @return command as a string.
- * @throws Exception if error occurs
- */
- public String getProductCommand(final Product product) throws Exception {
- StringBuffer buf = new StringBuffer(command);
- ProductId id = product.getId();
- // get path to product in storage, should be a directory
- File productDirectory = storage.getProductFile(id);
- buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT)
- .append(productDirectory.getCanonicalPath());
- buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
- .append(id.getType());
- buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
- .append(id.getCode());
- buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
- .append(id.getSource());
- buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
- .append(XmlUtils.formatDate(id.getUpdateTime()));
- buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
- .append(product.getStatus());
- if (product.isDeleted()) {
- buf.append(" ").append(CLIProductBuilder.DELETE_ARGUMENT);
- }
- if (product.getTrackerURL() != null) {
- buf.append(" ").append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
- .append(product.getTrackerURL().toString());
- }
- Map<String, String> props = product.getProperties();
- Iterator<String> iter = props.keySet().iterator();
- while (iter.hasNext()) {
- String name = iter.next();
- buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT)
- .append(name).append("=")
- .append(props.get(name).replace("\"", "\\\"")).append("\"");
- }
- Map<String, List<URI>> links = product.getLinks();
- iter = links.keySet().iterator();
- while (iter.hasNext()) {
- String relation = iter.next();
- Iterator<URI> iter2 = links.get(relation).iterator();
- while (iter2.hasNext()) {
- buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT)
- .append(relation).append("=")
- .append(iter2.next().toString());
- }
- }
- Content content = product.getContents().get("");
- if (content != null) {
- buf.append(" ").append(CLIProductBuilder.CONTENT_ARGUMENT);
- buf.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
- .append(content.getContentType());
- }
- if (product.getSignature() != null) {
- buf.append(" ").append(SIGNATURE_ARGUMENT)
- .append(product.getSignature());
- }
- return buf.toString();
- }
- /**
- * Split a command string into a command array.
- *
- * This version uses a StringTokenizer to split arguments. Quoted arguments
- * are supported (single or double), with quotes removed before passing to
- * runtime. Double quoting arguments will preserve quotes when passing to
- * runtime.
- *
- * @param command
- * command to run.
- * @return Array of arguments suitable for passing to
- * Runtime.exec(String[]).
- */
- protected static String[] splitCommand(final String command) {
- List<String> arguments = new LinkedList<String>();
- String currentArgument = null;
- // use a tokenizer because that's how Runtime.exec does it currently...
- StringTokenizer tokens = new StringTokenizer(command);
- while (tokens.hasMoreTokens()) {
- String token = tokens.nextToken();
- if (currentArgument == null) {
- currentArgument = token;
- } else {
- // continuing previous argument, that was split on whitespace
- currentArgument = currentArgument + " " + token;
- }
- if (currentArgument.startsWith("\"")) {
- // double quoted argument
- if (currentArgument.endsWith("\"")) {
- // that has balanced quotes
- // remove quotes and add argument
- currentArgument = currentArgument.substring(1,
- currentArgument.length() - 1);
- } else {
- // unbalanced quotes, keep going
- continue;
- }
- } else if (currentArgument.startsWith("'")) {
- // single quoted argument
- if (currentArgument.endsWith("'")) {
- // that has balanced quotes
- // remove quotes and add argument
- currentArgument = currentArgument.substring(1,
- currentArgument.length() - 1);
- } else {
- // unbalanced quotes, keep going
- continue;
- }
- }
- arguments.add(currentArgument);
- currentArgument = null;
- }
- if (currentArgument != null) {
- // weird, but add argument anyways
- arguments.add(currentArgument);
- }
- return arguments.toArray(new String[] {});
- }
- /**
- * Call the external process for this product.
- *
- * @param product Product
- * @throws Exception if error occurs
- */
- public void onProduct(final Product product) throws Exception {
- // store product
- try {
- storage.storeProduct(product);
- } catch (ProductAlreadyInStorageException e) {
- LOGGER.info("[" + getName() + "] product already in storage "
- + product.getId().toString());
- }
- // now run command
- String productCommand = null;
- Process process = null;
- int exitValue = -1;
- try {
- productCommand = getProductCommand(product);
- LOGGER.info("[" + getName() + "] running command " + productCommand);
- process = Runtime.getRuntime().exec(productCommand);
- // inline product content, may or may not be null
- Content content = product.getContents().get("");
- if (content != null) {
- StreamUtils.transferStream(content.getInputStream(),
- process.getOutputStream());
- } else {
- // need to close process stdin either way
- StreamUtils.closeStream(process.getOutputStream());
- }
- // maybe log/capture process input/error streams
- // or switch to "Command"
- exitValue = process.waitFor();
- } catch (Exception e) {
- if (process != null) {
- // make sure to kill zombies
- process.destroy();
- }
- // signal that process did not exit normally
- exitValue = -1;
- // give subclasses chance to handle exception
- commandException(product, productCommand, e);
- }
- // if process exited normally
- if (exitValue != -1) {
- // give subclasses chance to handle exitValue, which may be non-zero
- commandComplete(product, productCommand, exitValue);
- }
- }
- /**
- * Called when the command finishes executing normally.
- *
- * This implementation throws a NotificationListenerException if the
- * exitValue is non-zero.
- *
- * @param product
- * the product being processed.
- * @param command
- * the generated command, as a string.
- * @param exitValue
- * the exit status of the process.
- * @throws Exception
- * When re-notification should occur, based on maxTries, or none
- * if done.
- */
- public void commandComplete(final Product product, final String command,
- final int exitValue) throws Exception {
- LOGGER.info("[" + getName() + "] command '" + command
- + "' exited with status '" + exitValue + "'");
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
- HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
- Integer.toString(exitValue));
- if (exitValue != 0) {
- throw new NotificationListenerException("[" + getName()
- + "] command exited with status " + exitValue);
- }
- }
- /**
- * Called when an exception occurs while running command.
- *
- * This implementation throws a NotificationListenerException with exception
- * as the cause.
- *
- * @param product
- * product being processed
- * @param productCommand
- * command that was built
- * @param exception
- * exception that was thrown during execution. This will be an
- * InterruptedException if the process timed out.
- * @throws Exception
- * When re-notification should occur, based on maxTries, or none
- * if done.
- */
- public void commandException(final Product product,
- final String productCommand, final Exception exception)
- throws Exception {
- if (exception instanceof InterruptedException) {
- LOGGER.warning("[" + getName() + "] command '" + productCommand
- + "' timed out");
- } else {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception running command '" + productCommand + "'",
- exception);
- }
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(), "exception",
- productCommand);
- HeartbeatListener.sendHeartbeatMessage(getName(), "exception class",
- exception.getClass().getName());
- throw new NotificationListenerException(exception);
- }
- /**
- * @return the storage
- */
- public FileProductStorage getStorage() {
- return storage;
- }
- /**
- * @param storage
- * the storage to set
- */
- public void setStorage(FileProductStorage storage) {
- this.storage = storage;
- }
- /**
- * @return the command
- */
- public String getCommand() {
- return command;
- }
- /**
- * @param command
- * the command to set
- */
- public void setCommand(String command) {
- this.command = command;
- }
- }