ExternalIndexerListener.java

/*
 * ExternalIndexerListener
 */
package gov.usgs.earthquake.indexer;

import gov.usgs.earthquake.distribution.CLIProductBuilder;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.ExternalNotificationListener;
import gov.usgs.earthquake.distribution.FileProductStorage;
import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
import gov.usgs.earthquake.distribution.ProductStorage;
import gov.usgs.earthquake.indexer.IndexerChange.IndexerChangeType;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * ExternalIndexerListener triggers external, non-Java listener processes.
 *
 * Provides a translation to a command-line interface
 * for the product indexer to speak with external, non-Java listeners.
 *
 * As a child-class of the AbstractListener, this also accepts the following
 * configration parameters:
 *
 * <dl>
 * <dt>command</dt>
 * <dd>(Required) The command to execute. This must be an executable command and
 * may include arguments. Any product-specific arguments are appended at the end
 * of command.</dd>
 *
 * <dt>storage</dt>
 * <dd>(Required) A directory used to store all products. Each product is
 * extracted into a separate directory within this directory and is referenced
 * by the --directory=/path/to/directory argument when command is executed.</dd>
 *
 * <dt>processUnassociated</dt>
 * <dd>(Optional, Default = false) Whether or not to process unassociated
 * products. Valid values are "true" and "false".</dd>
 *
 * <dt>processPreferredOnly</dt>
 * <dd>(Optional, Default = false) Whether or not to process only preferred
 * products of the type accepted by this listener. Valid values are "true" and
 * "false".</dd>
 *
 * <dt>autoArchive</dt>
 * <dd>(Optional, Default = false) Whether or not to archive products from
 * storage when they are archived by the indexer.</dd>
 *
 * </dl>
 */
public class ExternalIndexerListener extends DefaultIndexerListener {

	private static final Logger LOGGER = Logger
			.getLogger(ExternalIndexerListener.class.getName());

	/** Argument for event action */
	public static final String EVENT_ACTION_ARGUMENT = "--action=";
	/** Argument for event ids */
	public static final String EVENT_IDS_ARGUMENT = "--eventids=";

	/** Argument for preferred event id */
	public static final String PREFERRED_ID_ARGUMENT = "--preferred-eventid=";
	/** Argument for preferred eventsource */
	public static final String PREFERRED_EVENTSOURCE_ARGUMENT = "--preferred-eventsource=";
	/** Argument for preferred eventsourcecode */
	public static final String PREFERRED_EVENTSOURCECODE_ARGUMENT = "--preferred-eventsourcecode=";
	/** Argument for preferred magnitude */
	public static final String PREFERRED_MAGNITUDE_ARGUMENT = "--preferred-magnitude=";
	/** Argument for preferred longitude */
	public static final String PREFERRED_LONGITUDE_ARGUMENT = "--preferred-longitude=";
	/** Argument for preferred latitude */
	public static final String PREFERRED_LATITUDE_ARGUMENT = "--preferred-latitude=";
	/** Argument for preferred depth */
	public static final String PREFERRED_DEPTH_ARGUMENT = "--preferred-depth=";
	/** Argument for preferred eventitme */
	public static final String PREFERRED_ORIGIN_TIME_ARGUMENT = "--preferred-eventtime=";
	/** Configuration parameter for storage directory product. */
	public static final String STORAGE_NAME_PROPERTY = "storage";

	/** Short circuit to directly configure storage directory. */
	public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

	/** Configuration parameter for command. */
	public static final String COMMAND_PROPERTY = "command";

	/** Configuration parameter for autoArchive. */
	public static final String AUTO_ARCHIVE_PROPERTY = "autoArchive";
	/** Default state for auto archive */
	public static final String AUTO_ARCHIVE_DEFAULT = "true";

	/** Argument used to pass signature to external process. */
	public static final String SIGNATURE_ARGUMENT = "--signature=";

	/** Where products are stored in extracted form. */
	private FileProductStorage storage;

	/** Command that is executed after a product is stored. */
	private String command;

	/** Archive products from listener storage when archived by indexer. */
	private boolean autoArchive = false;

	/**
	 * Construct a new ExternalIndexerListener object
	 *
	 * The listener must be configured with a FileProductStorage and a command
	 * to function.
	 */
	public ExternalIndexerListener() {
		super();
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see gov.usgs.earthquake.indexer.IndexerListener#onIndexerEvent(gov.usgs.
	 * earthquake.indexer.IndexerEvent)
	 */
	public void onIndexerEvent(IndexerEvent change) throws Exception {
		// Only handle products that are specifically included, unless there are
		// no specified inclusions, and do not handle products that are
		// specifically excluded.
		if (accept(change)) {
			// store product first
			Product product = storeProduct(change.getProduct());

			for (Iterator<IndexerChange> changeIter = change
					.getIndexerChanges().iterator(); changeIter.hasNext();) {
				IndexerChange indexerChange = changeIter.next();

				// check if we should process this change
				if (!accept(change, indexerChange)) {
					continue;
				}

				// build command
				final String indexerCommand = getProductSummaryCommand(change,
						indexerChange);

				runProductCommand(indexerCommand, product);
			}
		}

		if (autoArchive) {
			Iterator<IndexerChange> changeIter = change.getIndexerChanges()
					.iterator();
			ProductStorage storage = getStorage();
			while (changeIter.hasNext()) {
				IndexerChange nextChange = changeIter.next();
				if (nextChange.getType() == IndexerChangeType.PRODUCT_ARCHIVED) {
					// one product being archived
					if (change.getSummary() != null) {
						ProductId productId = change.getSummary().getId();
						LOGGER.log(Level.FINER,
								"[" + getName() + "] auto archiving product "
										+ productId.toString());
						storage.removeProduct(productId);
					}
				} else if (nextChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
					// all products on event being archived
					Event changeEvent = nextChange.getOriginalEvent();
					LOGGER.log(Level.FINER,
							"[" + getName() + "] auto archiving event "
									+ changeEvent.getEventId() + " products");
					Iterator<ProductSummary> productIter = changeEvent
							.getAllProductList().iterator();
					while (productIter.hasNext()) {
						ProductId productId = productIter.next().getId();
						LOGGER.log(Level.FINER,
								"[" + getName() + "] auto archiving product "
										+ productId.toString());
						storage.removeProduct(productId);
					}
				}
			}
		}
	}

	/**
	 * Store product associated with the change.
	 *
	 * @param product product to be stored.
	 * @return a new product object, read from the listener storage.
	 * @throws Exception if error occurs
	 */
	public Product storeProduct(final Product product) throws Exception {
		Product listenerProduct = null;
		try {
			if (product != null) {
				getStorage().storeProduct(product);
				listenerProduct = getStorage().getProduct(product.getId());
			} else {
				LOGGER.finer("[" + getName()
						+ "] Change product is null. Probably archiving.");
			}
		} catch (ProductAlreadyInStorageException paise) {
			LOGGER.info("[" + getName() + "] product already in storage");
			// keep going anyways, but load from local storage
			listenerProduct = getStorage().getProduct(product.getId());
		}

		return listenerProduct;
	}

	/**
	 * Run a product command.
	 *
	 * @param command command and arguments.
	 * @param product product, when set and empty content (path "") is defined,
	 *        the content is provided to the command on stdin.
	 * @throws Exception if error occurs
	 */
	public void runProductCommand(final String command, final Product product) throws Exception {
		// execute
		LOGGER.info("[" + getName() + "] running command " + command);
		final Process process = Runtime.getRuntime().exec(command);

		// Stream content over stdin if it exists
		if (product != null) {
			Content content = product.getContents().get("");
			if (content != null) {
				StreamUtils.transferStream(content.getInputStream(),
						process.getOutputStream());
			}
		}

		// Close the output stream
		StreamUtils.closeStream(process.getOutputStream());

		final Timer commandTimer;
		if (this.getTimeout() > 0) {
			 commandTimer = new Timer();
			// Schedule process destruction for commandTimeout
			// milliseconds in the future
			commandTimer.schedule(new TimerTask() {
				public void run() {
					LOGGER.warning("[" + getName()
							+ "] command timeout '" + command
							+ "', destroying process.");
					process.destroy();
				}
			}, this.getTimeout());
		} else {
			commandTimer = null;
		}

		try {
			// Wait for process to complete
			process.waitFor();
		} finally {
			if (commandTimer != null) {
				// Cancel the timer if it was not triggered
				commandTimer.cancel();
			}
		}
		LOGGER.info("[" + getName() + "] command '" + command
				+ "' exited with status '" + process.exitValue() + "'");
		if (process.exitValue() != 0) {
			byte[] errorOutput = StreamUtils.readStream(process.getErrorStream());
			LOGGER.fine("[" + getName() + "] command '" + command + "' stderr output '" +
					new String(errorOutput) + "'");
		}
		StreamUtils.closeStream(process.getErrorStream());

		// send heartbeat info
		HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
		HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
				Integer.toString(process.exitValue()));
	}

	/**
	 * Get the product command and add the indexer arguments to it.
	 *
	 * @param change
	 *            The IndexerEvent received by the ExternalIndexerListener
	 * @param indexerChange
	 *            The IndexerChange
	 * @return the command to execute with its arguments as a string
	 * @throws Exception if error occurs
	 */
	public String getProductSummaryCommand(IndexerEvent change,
			IndexerChange indexerChange) throws Exception {
		ProductSummary summary = change.getSummary();

		Event event = indexerChange.getNewEvent();
		// When archiving events include event information
		if (event == null && indexerChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
			event = indexerChange.getOriginalEvent();
		}

		String command = getProductSummaryCommand(event, summary);

		// Tells external indexer what type of index event occurred.
		command = command + " " +
				ExternalIndexerListener.EVENT_ACTION_ARGUMENT +
				indexerChange.getType().toString();

		return command;
	}

	/**
	 * Get the command for a specific event and summary.
	 *
	 * @param event Specific event
	 * @param summary Specific product summary
	 * @return command line arguments as a string.
	 * @throws Exception if error occurs
	 */
	public String getProductSummaryCommand(Event event, ProductSummary summary) throws Exception {
		StringBuffer indexerCommand = new StringBuffer(getCommand());

		if (event != null) {
			indexerCommand.append(getEventArguments(event));
		}
		if (summary != null) {
			indexerCommand.append(getProductSummaryArguments(summary));
		}


		Product product = null;
		try {
			product = getStorage().getProduct(summary.getId());
		} catch (Exception e) {
			// when archiving product may not exist
			LOGGER.log(
					Level.FINE,
					"Exception retreiving product from storage, probably archiving",
					e);
		}
		if (product != null) {
			// Can only add these arguments if there is a product
			Content content = product.getContents().get("");
			if (content != null) {
				indexerCommand.append(" ").append(
						CLIProductBuilder.CONTENT_ARGUMENT);
				indexerCommand.append(" ")
						.append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
						.append(content.getContentType());
			}

			if (product.getSignature() != null) {
				indexerCommand
						.append(" ")
						.append(ExternalNotificationListener.SIGNATURE_ARGUMENT)
						.append(product.getSignature());
			}

		}

		return indexerCommand.toString();
	}

	/**
	 * Get command line arguments for an event.
	 *
	 * @param event the event
	 * @return command line arguments
	 */
	public String getEventArguments(final Event event) {
		StringBuffer buf = new StringBuffer();

		EventSummary eventSummary = event.getEventSummary();
		buf.append(" ")
				.append(ExternalIndexerListener.PREFERRED_ID_ARGUMENT)
				.append(eventSummary.getId());
		buf.append(" ")
				.append(ExternalIndexerListener.PREFERRED_EVENTSOURCE_ARGUMENT)
				.append(eventSummary.getSource());
		buf.append(" ")
				.append(ExternalIndexerListener.PREFERRED_EVENTSOURCECODE_ARGUMENT)
				.append(eventSummary.getSourceCode());
		Map<String, List<String>> eventids = event.getAllEventCodes(true);
		Iterator<String> sourceIter = eventids.keySet().iterator();
		buf.append(" ").append(EVENT_IDS_ARGUMENT);
		while (sourceIter.hasNext()) {
			String source = sourceIter.next();
			Iterator<String> sourceCodeIter = eventids.get(source).iterator();
			while (sourceCodeIter.hasNext()) {
				String sourceCode = sourceCodeIter.next();
				buf.append(source).append(sourceCode);
				if (sourceCodeIter.hasNext() || sourceIter.hasNext()) {
					buf.append(",");
				}
			}
		}

		buf.append(" ").append(PREFERRED_MAGNITUDE_ARGUMENT)
				.append(eventSummary.getMagnitude());
		buf.append(" ").append(PREFERRED_LATITUDE_ARGUMENT)
				.append(eventSummary.getLatitude());
		buf.append(" ").append(PREFERRED_LONGITUDE_ARGUMENT)
				.append(eventSummary.getLongitude());
		buf.append(" ").append(PREFERRED_DEPTH_ARGUMENT)
				.append(eventSummary.getDepth());
		String eventTime = null;
		if (event.getTime() != null) {
			eventTime = XmlUtils.formatDate(event.getTime());
		}
		buf.append(" ").append(PREFERRED_ORIGIN_TIME_ARGUMENT)
				.append(eventTime);

		return buf.toString();
	}

	/**
	 * Get command line arguments for a product summary.
	 *
	 * @param summary the product summary
	 * @return command line arguments
	 * @throws IOException if IO error occurs
	 */
	public String getProductSummaryArguments(final ProductSummary summary) throws IOException {
		StringBuffer buf = new StringBuffer();

		File productDirectory = getStorage().getProductFile(summary.getId());
		if (productDirectory.exists()) {
			// Add the directory argument
			buf.append(" ")
					.append(CLIProductBuilder.DIRECTORY_ARGUMENT)
					.append(productDirectory.getCanonicalPath());
		}

		// Add arguments from summary
		buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
				.append(summary.getType());
		buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
				.append(summary.getCode());
		buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
				.append(summary.getSource());
		buf.append(" ")
				.append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
				.append(XmlUtils.formatDate(summary.getUpdateTime()));
		buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
				.append(summary.getStatus());
		if (summary.isDeleted()) {
			buf.append(" ")
					.append(CLIProductBuilder.DELETE_ARGUMENT);
		}

		// Add optional tracker URL argument
		if (summary.getTrackerURL() != null) {
			buf.append(" ")
					.append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
					.append(summary.getTrackerURL());
		}

		// Add property arguments
		Map<String, String> props = summary.getProperties();
		Iterator<String> iter = props.keySet().iterator();
		while (iter.hasNext()) {
			String name = iter.next();
			buf.append(" \"")
					.append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name)
					.append("=").append(props.get(name).replace("\"", "\\\""))
					.append("\"");
		}

		// Add link arguments
		Map<String, List<URI>> links = summary.getLinks();
		iter = links.keySet().iterator();
		while (iter.hasNext()) {
			String relation = iter.next();
			Iterator<URI> iter2 = links.get(relation).iterator();
			while (iter2.hasNext()) {
				buf.append(" ")
						.append(CLIProductBuilder.LINK_ARGUMENT)
						.append(relation).append("=")
						.append(iter2.next().toString());
			}
		}

		return buf.toString();
	}

	/**
	 * Configure an ExternalNotificationListener using a Config object.
	 *
	 * @param config
	 *            the config containing a
	 */
	public void configure(Config config) throws Exception {
		super.configure(config);

		command = config.getProperty(COMMAND_PROPERTY);
		if (command == null) {
			throw new ConfigurationException("[" + getName()
					+ "] 'command' is a required configuration property");
		}
		LOGGER.config("[" + getName() + "] command is '" + command + "'");

		// storage references an object in the global configuration
		String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
		String directoryName = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
		if (storageName == null && directoryName == null) {
			throw new ConfigurationException("[" + getName()
					+ "] one of 'storage' or 'storageDirectory' is required");
		}

		if (storageName != null) {
			LOGGER.config("[" + getName() + "] loading FileProductStorage '"
					+ storageName + "'");
			storage = (FileProductStorage) Config.getConfig().getObject(
					storageName);
			if (storage == null) {
				throw new ConfigurationException("[" + getName()
						+ "] unable to load FileProductStorage '" + storageName
						+ "'");
			}
		} else {
			LOGGER.config("[" + getName() + "] using storage directory '"
					+ directoryName + "'");
			storage = new FileProductStorage(new File(directoryName));
			storage.setName(getName() + "-storage");
		}

		autoArchive = Boolean.valueOf(config.getProperty(AUTO_ARCHIVE_PROPERTY,
				AUTO_ARCHIVE_DEFAULT));
		LOGGER.config("[" + getName() + "] autoArchive = " + autoArchive);
	}

	/**
	 * Called when client is shutting down.
	 */
	public void shutdown() throws Exception {
		super.shutdown();
		// maybe make current process a member and kill process?
		// or find way of detaching so client process can exit but product
		// process can complete?
		storage.shutdown();
	}

	/**
	 * Called after client has been configured and should begin processing.
	 */
	public void startup() throws Exception {
		// no background threads to start or objects to create
		storage.startup();
		super.startup();
	}

	/**
	 * @return the storage
	 */
	public FileProductStorage getStorage() {
		return storage;
	}

	/**
	 * @param storage
	 *            the storage to set
	 */
	public void setStorage(FileProductStorage storage) {
		this.storage = storage;
	}

	/**
	 * @return the command
	 */
	public String getCommand() {
		return command;
	}

	/**
	 * @param command
	 *            the command to set
	 */
	public void setCommand(String command) {
		this.command = command;
	}

	/**
	 * @return the autoArchive
	 */
	public boolean isAutoArchive() {
		return autoArchive;
	}

	/**
	 * @param autoArchive
	 *            the autoArchive to set
	 */
	public void setAutoArchive(boolean autoArchive) {
		this.autoArchive = autoArchive;
	}

}