ExternalNotificationListener.java
/*
* ExternalNotificationListener
*/
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;
import java.io.File;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An external process that is called when new products arrive.
*
* The ExternalNotificationListener implements the Configurable interface and
* can use the following configuration parameters:
*
* <dl>
* <dt>command</dt>
* <dd>(Required) The command to execute. This must be an executable command and
* may include arguments. Any product specific arguments are appended at the end
* of command.</dd>
*
* <dt>storage</dt>
* <dd>(Required) A directory used to store all products. Each product is
* extracted into a separate directory within this directory and is referenced
* by the --directory=/path/to/directory argument when command is executed.</dd>
* </dl>
*
*/
public class ExternalNotificationListener extends DefaultNotificationListener {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(ExternalNotificationListener.class.getName());
/** Configuration parameter for storage directory product. */
public static final String STORAGE_NAME_PROPERTY = "storage";
/** Configuration parameter for command. */
public static final String COMMAND_PROPERTY = "command";
/** Argument used to pass signature to external process. */
public static final String SIGNATURE_ARGUMENT = "--signature=";
private static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";
/** Where products are stored in extracted form. */
private FileProductStorage storage;
/** Command that is executed after a product is stored. */
private String command;
/**
* Construct a new ExternalNotificationListener.
*
* The listener must be configured with a FileProductStorage and command to
* function.
*/
public ExternalNotificationListener() {
}
/**
* Configure an ExternalNotificationListener using a Config object.
*
* @param config
* the config containing a
*/
public void configure(Config config) throws Exception {
super.configure(config);
command = config.getProperty(COMMAND_PROPERTY);
if (command == null) {
throw new ConfigurationException("[" + getName()
+ "] 'command' is a required configuration property");
}
LOGGER.config("[" + getName() + "] command is '" + command + "'");
// storage references an object in the global configuration
String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
String storageDirectory = config
.getProperty(STORAGE_DIRECTORY_PROPERTY);
if (storageName == null && storageDirectory == null) {
throw new ConfigurationException("[" + getName()
+ "] 'storage' is a required configuration property.");
}
if (storageName != null) {
LOGGER.config("[" + getName() + "] loading FileProductStorage '"
+ storageName + "'");
storage = (FileProductStorage) Config.getConfig().getObject(
storageName);
if (storage == null) {
throw new ConfigurationException("[" + getName()
+ "] unable to load FileProductStorage '" + storageName
+ "'");
}
} else {
LOGGER.config("[" + getName() + "] using storage directory '"
+ storageDirectory + "'");
storage = new FileProductStorage(new File(storageDirectory));
}
}
/**
* Called when client is shutting down.
*/
public void shutdown() throws Exception {
super.shutdown();
// maybe make current process a member and kill process?
// or find way of detaching so client process can exit but product
// process can complete?
storage.shutdown();
}
/**
* Called after client has been configured and should begin processing.
*/
public void startup() throws Exception {
// no background threads to start or objects to create
storage.startup();
super.startup();
}
/**
* Append product arguments to the base command.
*
* @param product
* the product used to generate arguments.
* @return command as a string.
* @throws Exception if error occurs
*/
public String getProductCommand(final Product product) throws Exception {
StringBuffer buf = new StringBuffer(command);
ProductId id = product.getId();
// get path to product in storage, should be a directory
File productDirectory = storage.getProductFile(id);
buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT)
.append(productDirectory.getCanonicalPath());
buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
.append(id.getType());
buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
.append(id.getCode());
buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
.append(id.getSource());
buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
.append(XmlUtils.formatDate(id.getUpdateTime()));
buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
.append(product.getStatus());
if (product.isDeleted()) {
buf.append(" ").append(CLIProductBuilder.DELETE_ARGUMENT);
}
if (product.getTrackerURL() != null) {
buf.append(" ").append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
.append(product.getTrackerURL().toString());
}
Map<String, String> props = product.getProperties();
Iterator<String> iter = props.keySet().iterator();
while (iter.hasNext()) {
String name = iter.next();
buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT)
.append(name).append("=")
.append(props.get(name).replace("\"", "\\\"")).append("\"");
}
Map<String, List<URI>> links = product.getLinks();
iter = links.keySet().iterator();
while (iter.hasNext()) {
String relation = iter.next();
Iterator<URI> iter2 = links.get(relation).iterator();
while (iter2.hasNext()) {
buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT)
.append(relation).append("=")
.append(iter2.next().toString());
}
}
Content content = product.getContents().get("");
if (content != null) {
buf.append(" ").append(CLIProductBuilder.CONTENT_ARGUMENT);
buf.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
.append(content.getContentType());
}
if (product.getSignature() != null) {
buf.append(" ").append(SIGNATURE_ARGUMENT)
.append(product.getSignature());
}
return buf.toString();
}
/**
* Split a command string into a command array.
*
* This version uses a StringTokenizer to split arguments. Quoted arguments
* are supported (single or double), with quotes removed before passing to
* runtime. Double quoting arguments will preserve quotes when passing to
* runtime.
*
* @param command
* command to run.
* @return Array of arguments suitable for passing to
* Runtime.exec(String[]).
*/
protected static String[] splitCommand(final String command) {
List<String> arguments = new LinkedList<String>();
String currentArgument = null;
// use a tokenizer because that's how Runtime.exec does it currently...
StringTokenizer tokens = new StringTokenizer(command);
while (tokens.hasMoreTokens()) {
String token = tokens.nextToken();
if (currentArgument == null) {
currentArgument = token;
} else {
// continuing previous argument, that was split on whitespace
currentArgument = currentArgument + " " + token;
}
if (currentArgument.startsWith("\"")) {
// double quoted argument
if (currentArgument.endsWith("\"")) {
// that has balanced quotes
// remove quotes and add argument
currentArgument = currentArgument.substring(1,
currentArgument.length() - 1);
} else {
// unbalanced quotes, keep going
continue;
}
} else if (currentArgument.startsWith("'")) {
// single quoted argument
if (currentArgument.endsWith("'")) {
// that has balanced quotes
// remove quotes and add argument
currentArgument = currentArgument.substring(1,
currentArgument.length() - 1);
} else {
// unbalanced quotes, keep going
continue;
}
}
arguments.add(currentArgument);
currentArgument = null;
}
if (currentArgument != null) {
// weird, but add argument anyways
arguments.add(currentArgument);
}
return arguments.toArray(new String[] {});
}
/**
* Call the external process for this product.
*
* @param product Product
* @throws Exception if error occurs
*/
public void onProduct(final Product product) throws Exception {
// store product
try {
storage.storeProduct(product);
} catch (ProductAlreadyInStorageException e) {
LOGGER.info("[" + getName() + "] product already in storage "
+ product.getId().toString());
}
// now run command
String productCommand = null;
Process process = null;
int exitValue = -1;
try {
productCommand = getProductCommand(product);
LOGGER.info("[" + getName() + "] running command " + productCommand);
process = Runtime.getRuntime().exec(productCommand);
// inline product content, may or may not be null
Content content = product.getContents().get("");
if (content != null) {
StreamUtils.transferStream(content.getInputStream(),
process.getOutputStream());
} else {
// need to close process stdin either way
StreamUtils.closeStream(process.getOutputStream());
}
// maybe log/capture process input/error streams
// or switch to "Command"
exitValue = process.waitFor();
} catch (Exception e) {
if (process != null) {
// make sure to kill zombies
process.destroy();
}
// signal that process did not exit normally
exitValue = -1;
// give subclasses chance to handle exception
commandException(product, productCommand, e);
}
// if process exited normally
if (exitValue != -1) {
// give subclasses chance to handle exitValue, which may be non-zero
commandComplete(product, productCommand, exitValue);
}
}
/**
* Called when the command finishes executing normally.
*
* This implementation throws a NotificationListenerException if the
* exitValue is non-zero.
*
* @param product
* the product being processed.
* @param command
* the generated command, as a string.
* @param exitValue
* the exit status of the process.
* @throws Exception
* When re-notification should occur, based on maxTries, or none
* if done.
*/
public void commandComplete(final Product product, final String command,
final int exitValue) throws Exception {
LOGGER.info("[" + getName() + "] command '" + command
+ "' exited with status '" + exitValue + "'");
// send heartbeat info
HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
Integer.toString(exitValue));
if (exitValue != 0) {
throw new NotificationListenerException("[" + getName()
+ "] command exited with status " + exitValue);
}
}
/**
* Called when an exception occurs while running command.
*
* This implementation throws a NotificationListenerException with exception
* as the cause.
*
* @param product
* product being processed
* @param productCommand
* command that was built
* @param exception
* exception that was thrown during execution. This will be an
* InterruptedException if the process timed out.
* @throws Exception
* When re-notification should occur, based on maxTries, or none
* if done.
*/
public void commandException(final Product product,
final String productCommand, final Exception exception)
throws Exception {
if (exception instanceof InterruptedException) {
LOGGER.warning("[" + getName() + "] command '" + productCommand
+ "' timed out");
} else {
LOGGER.log(Level.WARNING, "[" + getName()
+ "] exception running command '" + productCommand + "'",
exception);
}
// send heartbeat info
HeartbeatListener.sendHeartbeatMessage(getName(), "exception",
productCommand);
HeartbeatListener.sendHeartbeatMessage(getName(), "exception class",
exception.getClass().getName());
throw new NotificationListenerException(exception);
}
/**
* @return the storage
*/
public FileProductStorage getStorage() {
return storage;
}
/**
* @param storage
* the storage to set
*/
public void setStorage(FileProductStorage storage) {
this.storage = storage;
}
/**
* @return the command
*/
public String getCommand() {
return command;
}
/**
* @param command
* the command to set
*/
public void setCommand(String command) {
this.command = command;
}
}