EIDSInputWedge.java

package gov.usgs.earthquake.eids;

import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.xml.bind.JAXBException;

import org.xml.sax.SAXParseException;

import gov.usgs.earthquake.distribution.Bootstrappable;
import gov.usgs.earthquake.distribution.CLIProductBuilder;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.ProductBuilder;
import gov.usgs.earthquake.distribution.ProductSender;
import gov.usgs.earthquake.distribution.SocketProductSender;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.FileContent;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.quakeml.FileToQuakemlConverter;
import gov.usgs.util.Config;
import gov.usgs.util.CryptoUtils;
import gov.usgs.util.DirectoryPoller;
import gov.usgs.util.FileUtils;
import gov.usgs.util.StringUtils;
import gov.usgs.util.CryptoUtils.Version;

/**
 * Read messages from files or a poll directory, and push products into PDL.
 *
 * This is supports EIDS/QDDS style polling. The input messages are converted to
 * Quakeml using the FileToQuakemlConverter interface, then sent as Quakeml
 * based products.
 *
 * Much of the configuration can be supplied using either a configuration file,
 * or command line arguments.
 */
public class EIDSInputWedge extends ProductBuilder implements Runnable,
		Bootstrappable {

	/**
	 * Global reference to arguments array, when EIDSInputWedge is run via
	 * Bootstrap.
	 */
	public static String[] ARGS = null;

	private static final Logger LOGGER = Logger.getLogger(EIDSInputWedge.class
			.getName());

	/** Property for parser class */
	public static final String PARSER_CLASS_PROPERTY = "parserClass";
	/** Default parser class */
	public static final String DEFAULT_PARSER_CLASS = "gov.usgs.earthquake.event.QuakemlToQuakemlConverter";

	/** Property for polldir */
	public static final String POLLDIR_PROPERTY = "directory";
	/** Default polldir */
	public static final String DEFAULT_POLLDIR = "polldir";
	private File polldir = new File(DEFAULT_POLLDIR);

	/** Property for storage directory */
	public static final String STORAGEDIR_PROPERTY = "oldinputdir";
	/** Default storage directory */
	public static final String DEFAULT_STORAGEDIR = "oldinput";
	private File storagedir = new File(DEFAULT_STORAGEDIR);

	/** Property for error directory */
	public static final String ERRORDIR_PROPERTY = "errordir";
	/** Default error directory */
	public static final String DEFAULT_ERRORDIR = "errordir";
	private File errordir = new File(DEFAULT_ERRORDIR);

	/** Property for validate */
	public static final String VALIDATE_PROPERTY = "validate";
	/** Default status of validate */
	public static final String DEFAULT_VALIDATE = "false";

	/** Property for sendOriginWhenPhasesExist */
	public static final String SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY = "sendOriginWhenPhasesExist";
	/** Default status of sendOrigin...  */
	public static final String DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST = "false";

	/** Property for sendMechanismWhenPhasesExist */
	public static final String SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY = "sendMechanismWhenPhasesExist";
	/** Default status of sendMechanism... */
	public static final String DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST = "false";

	/** Convert parsed quakeml to a product. */
	private ProductCreator productCreator = new QuakemlProductCreator();

	/** Whether created products should be converted to internal types. */
	public static final String CREATE_INTERNAL_PRODUCTS_PROPERTY = "createInternalProducts";
	/** Default status of CREATE_INTERNAL_PRODUCTS */
	public static final String DEFAULT_CREATE_INTERNAL_PRODUCTS = "false";
	private boolean createInternalProducts = false;

	/** Whether created products should be converted to scenario types. */
	public static final String CREATE_SCENARIO_PRODUCTS_PROPERTY = "createScenarioProducts";
	/** Default status of CREATE_SCENARIO_PRODUCTS */
	public static final String DEFAULT_CREATE_SCENARIO_PRODUCTS = "false";
	private boolean createScenarioProducts = false;

	/** Directory polling object. */
	private DirectoryPoller directoryPoller;

	/** Poll interval property */
	public static final String POLLINTERVAL_PROPERTY = "interval";
	/** Default interval for POLLINTERVAL */
	public static final String DEFAULT_POLLINTERVAL = "1000";
	private long pollInterval = 1000L;

	/** Property for pollCarefully */
	public static final String POLL_CAREFULLY_PROPERTY = "pollCarefully";
	/** Default status of POLL_CAREFULLY */
	public static final String DEFAULT_POLL_CAREFULLY = "false";
	private boolean pollCarefully = false;

	/** Property for doBufferFix */
	public static final String DO_BUFFER_FIX_PROPERTY = "doBufferFix";
	/** Default status of DO_BUFFER_FIX property */
	public static final String DEFAULT_DO_BUFFER_FIX = "true";
	private boolean doBufferFix = true;

	private Thread pollThread = null;

	/**
	 * Empty constructor
	 * @throws Exception if error occurs
	 */
	public EIDSInputWedge() throws Exception {
	}

	/**
	 * Gets products from file and iterates through each product
	 * During iteration, sets type to internal/scenario if createInternalProducts
	 * or createScenarioProducts is true. Attaches Content files to product,
	 * Sends product
	 * @param file File containing products
	 * @param attachContent Map of String and Content
	 * @return Map of product IDs and sent products
	 * @throws Exception if error occurs
	 */
	public Map<ProductId, Map<ProductSender, Exception>> parseAndSend(
			final File file, final Map<String, Content> attachContent)
			throws Exception {

		Map<ProductId, Map<ProductSender, Exception>> sendProductResults = new HashMap<ProductId, Map<ProductSender, Exception>>();

		List<Product> products = productCreator.getProducts(file);

		Iterator<Product> iter = products.iterator();
		while (iter.hasNext()) {
			Product product = iter.next();
			ProductId id = product.getId();

			if (createInternalProducts) {
				id.setType("internal-" + id.getType());
			}
			if (createScenarioProducts) {
				id.setType(id.getType() + "-scenario");
			}

			// attach files to generated product
			if (attachContent != null && attachContent.size() > 0) {
				if (products.size() > 1) {
					throw new Exception("Trying to attach files,"
							+ " generated more than 1 product");
				}
				product.getContents().putAll(attachContent);
			}

			// send product, save any exceptions
			sendProductResults.put(product.getId(), sendProduct(product));
		}

		return sendProductResults;
	}

	/**
	 * Parses given file, looking for send exceptions and reports statistics
	 * @param file to parse and look for errors
	 */
	public void onFile(File file) {
		Date inputtime = new Date();
		LOGGER.info("Reading file " + file.getName());

		try {
			Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(
					file, null);

			// check how send went
			int numSenders = getProductSenders().size();
			int total = sendExceptions.size();
			int successful = 0;
			int partialFailures = 0;
			int totalFailures = 0;

			Iterator<ProductId> sentIds = sendExceptions.keySet().iterator();
			while (sentIds.hasNext()) {
				ProductId sentId = sentIds.next();
				if (sendExceptions.get(sentId).size() == numSenders) {
					totalFailures++;
					LOGGER.severe("Total failure sending product "
							+ sentId.toString());
				} else {
					// output built product id because it was sent at least once
					System.out.println(sentId.toString());

					if (sendExceptions.get(sentId).size() == 0) {
						successful++;
					} else {
						partialFailures++;
						LOGGER.warning("Partial failure sending product "
								+ sentId.toString());
					}
				}
			}

			LOGGER.info("generated " + total + " products: " + successful
					+ " sent, " + partialFailures + " partially sent, "
					+ totalFailures + " failed to send");

			// notify of failures using exit code
			if (totalFailures > 0) {
				// consider this failure, event if some products sent
				throw new Exception();
			}

			// succeeded, at least somewhat
			// move to oldinput directory
			file.renameTo(new File(storagedir, file.getName() + "_"
					+ inputtime.getTime()));
		} catch (Exception e) {
			if (e instanceof JAXBException
					&& ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
				SAXParseException spe = (SAXParseException) ((JAXBException) e)
						.getLinkedException();
				LOGGER.warning("Parse error: " + spe.getMessage() + "; line="
						+ spe.getLineNumber() + ", column="
						+ spe.getColumnNumber());
			}

			if (errordir != null) {
				if (!errordir.exists()) {
					errordir.mkdirs();
				}
				File errorfile = new File(errordir, file.getName() + "_"
						+ new Date().getTime());

				// move to error directory
				try {
				LOGGER.log(Level.WARNING, "Errors processing file, moving to "
						+ errorfile.getCanonicalPath(), e);
				} catch (Exception ignore) {
					//ignore
				}
				file.renameTo(errorfile);
			} else {
				LOGGER.warning("Error processing file " + file.getName()
						+ ", and no error directory configured");
				FileUtils.deleteTree(file);
			}
		}
	}

	@Override
	public void configure(Config config) throws Exception {
		super.configure(config);

		String parserClassName = config.getProperty(PARSER_CLASS_PROPERTY);
		if (parserClassName == null) {
			LOGGER.config("Using QuakemlToProductConverter");
		} else {
			Object parserObj = Class.forName(parserClassName)
					.getConstructor().newInstance();
			if (parserObj instanceof ProductCreator) {
				productCreator = (ProductCreator) parserObj;
			} else if (parserObj instanceof FileToQuakemlConverter) {
				QuakemlProductCreator quakemlCreator = new QuakemlProductCreator(doBufferFix);
				quakemlCreator.setConverter((FileToQuakemlConverter) parserObj);
				productCreator = quakemlCreator;
			} else {
				throw new ConfigurationException("configured parser class "
						+ parserClassName + " does not implement "
						+ FileToQuakemlConverter.class.getName());
			}
			LOGGER.config("Using parser class " + parserClassName);
		}

		boolean validate = Boolean.getBoolean(config.getProperty(
				VALIDATE_PROPERTY, DEFAULT_VALIDATE));
		productCreator.setValidate(validate);
		LOGGER.config("Validation " + (validate ? "enabled" : "disabled"));

		boolean sendOriginWhenPhasesExist = Boolean.valueOf(config
				.getProperty(SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY,
						DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST));
		if (productCreator instanceof EQMessageProductCreator) {
			((EQMessageProductCreator) productCreator)
					.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
		} else {
			((QuakemlProductCreator) productCreator)
					.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
		}
		LOGGER.config("sendOriginWhenPhasesExist = "
				+ sendOriginWhenPhasesExist);

		boolean sendMechanismWhenPhasesExist = Boolean.valueOf(config
				.getProperty(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY,
						DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST));
		if (productCreator instanceof EQMessageProductCreator) {
			if (sendMechanismWhenPhasesExist) {
				LOGGER.warning(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY
						+ " is not supported for EQMessageProductCreator");
			}
		} else {
			((QuakemlProductCreator) productCreator)
					.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
		}
		LOGGER.config("sendMechanismWhenPhasesExist = "
				+ sendMechanismWhenPhasesExist);

		polldir = new File(
				config.getProperty(POLLDIR_PROPERTY, DEFAULT_POLLDIR));
		LOGGER.config("Using poll directory " + polldir.getCanonicalPath());

		pollInterval = Long.valueOf(config.getProperty(POLLINTERVAL_PROPERTY,
				DEFAULT_POLLINTERVAL));
		LOGGER.config("Using poll interval " + pollInterval + "ms");

		pollCarefully = Boolean.valueOf(config.getProperty(POLL_CAREFULLY_PROPERTY,
				DEFAULT_POLL_CAREFULLY));
		LOGGER.config("Poll carefully = " + pollCarefully);

		storagedir = new File(config.getProperty(STORAGEDIR_PROPERTY,
				DEFAULT_STORAGEDIR));
		LOGGER.config("Using oldinput directory "
				+ storagedir.getCanonicalPath());

		errordir = new File(config.getProperty(ERRORDIR_PROPERTY,
				DEFAULT_ERRORDIR));
		LOGGER.config("Using error directory " + errordir.getCanonicalPath());

		createInternalProducts = Boolean.valueOf(config
				.getProperty(CREATE_INTERNAL_PRODUCTS_PROPERTY,
						DEFAULT_CREATE_INTERNAL_PRODUCTS));
		LOGGER.config("createInternalProducts = " + createInternalProducts);

		createScenarioProducts = Boolean.valueOf(config
				.getProperty(CREATE_SCENARIO_PRODUCTS_PROPERTY,
						DEFAULT_CREATE_SCENARIO_PRODUCTS));
		LOGGER.config("createScenarioProducts = " + createScenarioProducts);

		doBufferFix = Boolean.valueOf(config
				.getProperty(DO_BUFFER_FIX_PROPERTY,
								DEFAULT_DO_BUFFER_FIX));
		LOGGER.config("doBufferFix = " + doBufferFix);
	}

	@Override
	public void shutdown() throws Exception {
		if (pollThread != null) {
			pollThread.interrupt();
			pollThread = null;
		}

		super.shutdown();
	}

	@Override
	public void startup() throws Exception {
		super.startup();

		if (pollThread == null) {
			pollThread = new Thread(this);
			pollThread.setName("poll thread");
			pollThread.start();
		}
	}

	/** @return polldir */
	public File getPolldir() {
		return polldir;
	}

	/** @param polldir File to set */
	public void setPolldir(File polldir) {
		this.polldir = polldir;
	}

	/** @return storagedir */
	public File getStoragedir() {
		return storagedir;
	}

	/** @param storagedir File to set */
	public void setStoragedir(File storagedir) {
		this.storagedir = storagedir;
	}

	/** @return errordir */
	public File getErrordir() {
		return errordir;
	}

	/** @param errordir File to send */
	public void setErrordir(File errordir) {
		this.errordir = errordir;
	}

	/** @return productCreator */
	public ProductCreator getProductCreator() {
		return productCreator;
	}

	/** @param productCreator to set */
	public void setProductCreator(ProductCreator productCreator) {
		this.productCreator = productCreator;
	}

	/** @return directoryPoller */
	public DirectoryPoller getDirectoryPoller() {
		return directoryPoller;
	}

	/** @param directoryPoller to set */
	public void setDirectoryPoller(DirectoryPoller directoryPoller) {
		this.directoryPoller = directoryPoller;
	}

	/** @return pollInterval long */
	public long getPollInterval() {
		return pollInterval;
	}

	/** @param pollInterval long to set */
	public void setPollInterval(long pollInterval) {
		this.pollInterval = pollInterval;
	}

	/** @return pollCarefully boolean */
	public boolean isPollCarefully() {
		return pollCarefully;
	}

	/** @param pollCarefully boolean to set */
	public void setPollCarefully(boolean pollCarefully) {
		this.pollCarefully = pollCarefully;
	}

	/**
	 * @return the createInternalProducts
	 */
	public boolean isCreateInternalProducts() {
		return createInternalProducts;
	}

	/**
	 * @param createInternalProducts
	 *            the createInternalProducts to set
	 */
	public void setCreateInternalProducts(boolean createInternalProducts) {
		this.createInternalProducts = createInternalProducts;
	}

	/**
	 * @return the createScenarioProducts
	 */
	public boolean isCreateScenarioProducts() {
		return createScenarioProducts;
	}

	/**
	 * @param createScenarioProducts
	 *            the createScenarioProducts to set
	 */
	public void setCreateScenarioProducts(boolean createScenarioProducts) {
		this.createScenarioProducts = createScenarioProducts;
	}

	/**
	 * Parses a string of servers into SocketProductSenders, all put into a list
	 * of product senders
	 * @param servers String of servers, split by commas
	 * @param connectTimeout int timeout
	 * @param binaryFormat boolean if binary format
	 * @param enableDeflate boolean if Deflate should be enabled
	 * @return List of product senders
	 */
	public static List<ProductSender> parseServers(final String servers,
			final Integer connectTimeout, final boolean binaryFormat,
			final boolean enableDeflate) {
		List<ProductSender> senders = new ArrayList<ProductSender>();

		Iterator<String> iter = StringUtils.split(servers, ",").iterator();
		while (iter.hasNext()) {
			String server = iter.next();
			String[] parts = server.split(":");
			SocketProductSender sender = new SocketProductSender(parts[0],
					Integer.parseInt(parts[1]), connectTimeout);
			sender.setBinaryFormat(binaryFormat);
			sender.setEnableDeflate(enableDeflate);
			senders.add(sender);
		}

		return senders;
	}

	/** Argument for help */
	public static final String HELP_ARGUMENT = "--help";
	/** Argument for poll */
	public static final String POLL_ARGUMENT = "--poll";

	/** Argument for poleCarefully */
	public static final String POLL_CAREFULLY_ARGUMENT = "--pollCarefully";
	/** Argument for polldir */
	public static final String POLLDIR_ARGUMENT = "--polldir=";
	/** Argument for errordir */
	public static final String ERRORDIR_ARGUMENT = "--errordir=";
	/** Argument for storagedir */
	public static final String STORAGEDIR_ARGUMENT = "--oldinputdir=";
	/** Argument for poll interval */
	public static final String POLL_INTERVAL_ARGUMENT = "--pollInterval=";
	/** Argument for tracker url */
	public static final String TRACKER_URL_ARGUMENT = "--trackerURL=";

	/** Argument for file */
	public static final String FILE_ARGUMENT = "--file=";

	/** Argument for parser */
	public static final String PARSER_ARGUMENT = "--parser=";
	/** Argument for validate */
	public static final String VALIDATE_ARGUMENT = "--validate";
	/** Argument for privateKey */
	public static final String PRIVATE_KEY_ARGUMENT = "--privateKey=";
	/** Argument for signatureVersion */
	public static final String SIGNATURE_VERSION_ARGUMENT = "--signatureVersion=";

	/** Argument for servers */
	public static final String SERVERS_ARGUMENT = "--servers=";
	/** Default server for server argument */
	public static final String SERVERS_DEFAULT = "prod01-pdl01.cr.usgs.gov:11235,prod02-pdl01.cr.usgs.gov:11235";
	/** Argument for connection timeout */
	public static final String CONNECT_TIMEOUT_ARGUMENT = "--connectTimeout=";
	/** Default timeout for connection */
	public static final Integer DEFAULT_CONNECT_TIMEOUT = 15000;
	/** Argument for binaryFormat */
	public static final String BINARY_FORMAT_ARGUMENT = "--binaryFormat";
	/** Argument for disableDeflate */
	public static final String DISABLE_DEFLATE_ARGUMENT = "--disableDeflate";
	/** Argument for attach */
	public static final String ATTACH_ARGUMENT = "--attach=";
	/** Argument for sending origin with phases */
	public static final String SEND_ORIGINS_WITH_PHASES = "--sendOriginWhenPhasesExist";
	/** Argument for sending mechanisms with phases */
	public static final String SEND_MECHANISMS_WITH_PHASES = "--sendMechanismWhenPhasesExist";

	/** Argument for creating internal products */
	public static final String CREATE_INTERNAL_PRODUCTS = "--internal";
	/** Argument for creating scenario products */
	public static final String CREATE_SCENARIO_PRODUCTS = "--scenario";

	/** Argument for testing */
	public static final String TEST_ARGUMENT = "--test";

	/**
	 * Bootstrappable interface.
	 */
	@Override
	public void run(final String[] args) throws Exception {
		// save arguments in global for access by FileToQuakemlParser objects.
		EIDSInputWedge.ARGS = args;

		boolean test = false;
		boolean help = false;
		boolean poll = false;
		boolean validate = this.productCreator.isValidate();
		boolean sendOriginWhenPhasesExist = false;
		boolean sendMechanismWhenPhasesExist = false;

		// preserve any existing settings from config file
		if (productCreator instanceof QuakemlProductCreator) {
			sendOriginWhenPhasesExist = ((QuakemlProductCreator) productCreator)
					.isSendOriginWhenPhasesExist();
			sendMechanismWhenPhasesExist = ((QuakemlProductCreator) productCreator)
					.isSendMechanismWhenPhasesExist();
		} else if (productCreator instanceof EQMessageProductCreator) {
			sendOriginWhenPhasesExist = ((EQMessageProductCreator) productCreator)
					.isSendOriginWhenPhasesExist();
		}

		File file = null;
		// when sending 1 product, allow extra files to be attached.
		HashMap<String, Content> attachContent = new HashMap<String, Content>();
		Integer connectTimeout = DEFAULT_CONNECT_TIMEOUT;
		boolean binaryFormat = false;
		boolean enableDeflate = true;

		StringBuffer arguments = new StringBuffer();
		for (String arg : args) {
			arguments.append(arg).append(" ");
			if (arg.equals(HELP_ARGUMENT)) {
				help = true;
			} else if (arg.equals(POLL_ARGUMENT)) {
				poll = true;
			} else if (arg.equals(POLL_CAREFULLY_ARGUMENT)) {
				pollCarefully = true;
			} else if (arg.equals(SEND_ORIGINS_WITH_PHASES)) {
				sendOriginWhenPhasesExist = true;
			} else if (arg.equals(SEND_MECHANISMS_WITH_PHASES)) {
				sendMechanismWhenPhasesExist = true;
			} else if (arg.startsWith(POLLDIR_ARGUMENT)) {
				setPolldir(new File(arg.replace(POLLDIR_ARGUMENT, "")));
			} else if (arg.startsWith(ERRORDIR_ARGUMENT)) {
				setErrordir(new File(arg.replace(ERRORDIR_ARGUMENT, "")));
			} else if (arg.startsWith(STORAGEDIR_ARGUMENT)) {
				setStoragedir(new File(arg.replace(STORAGEDIR_ARGUMENT, "")));
			} else if (arg.startsWith(FILE_ARGUMENT)) {
				file = new File(arg.replace(FILE_ARGUMENT, ""));
			} else if (arg.startsWith(ATTACH_ARGUMENT)) {
				File attach = new File(arg.replace(ATTACH_ARGUMENT, ""));
				if (attach.isDirectory()) {
					attachContent.putAll(FileContent
							.getDirectoryContents(attach));
				} else {
					attachContent
							.put(attach.getName(), new FileContent(attach));
				}
			} else if (arg.startsWith(PARSER_ARGUMENT)) {
				Object parser = Class.forName(arg.replace(PARSER_ARGUMENT, ""))
						.getConstructor().newInstance();
				if (parser instanceof ProductCreator) {
					setProductCreator((ProductCreator) parser);
				} else {
					QuakemlProductCreator productCreator = new QuakemlProductCreator();
					productCreator
							.setConverter((FileToQuakemlConverter) parser);
					setProductCreator(productCreator);
				}
			} else if (arg.startsWith(VALIDATE_ARGUMENT)) {
				validate = true;
			} else if (arg.startsWith(SERVERS_ARGUMENT)) {
				// ignore servers argument when in test mode
				if (!test) {
					getProductSenders().clear();
					getProductSenders().addAll(
							parseServers(arg.replace(SERVERS_ARGUMENT, ""),
									connectTimeout, binaryFormat, enableDeflate));
				}
			} else if (arg.startsWith(TEST_ARGUMENT)) {
				test = true;
				getProductSenders().clear();
				getProductSenders().add(new DebugProductSender());
			} else if (arg.startsWith(PRIVATE_KEY_ARGUMENT)) {
				setPrivateKey(CryptoUtils.readOpenSSHPrivateKey(FileUtils
						.readFile(new File(arg
								.replace(PRIVATE_KEY_ARGUMENT, ""))), null));
			} else if (arg.startsWith(SIGNATURE_VERSION_ARGUMENT)) {
				setSignatureVersion(Version.fromString(
						arg.replace(SIGNATURE_VERSION_ARGUMENT, "")));
			} else if (arg.startsWith(CONNECT_TIMEOUT_ARGUMENT)) {
				connectTimeout = Integer.valueOf(arg.replace(
						CONNECT_TIMEOUT_ARGUMENT, ""));
			} else if (arg.equals(BINARY_FORMAT_ARGUMENT)) {
				binaryFormat = true;
			} else if (arg.equals(DISABLE_DEFLATE_ARGUMENT)) {
				enableDeflate = false;
			} else if (arg.startsWith(POLL_INTERVAL_ARGUMENT)) {
				setPollInterval(Long.valueOf(arg.replace(
						POLL_INTERVAL_ARGUMENT, "")));
			} else if (arg.startsWith(TRACKER_URL_ARGUMENT)) {
				this.setTrackerURL(new URL(arg
						.replace(TRACKER_URL_ARGUMENT, "")));
			} else if (arg.equals(CREATE_INTERNAL_PRODUCTS)) {
				createInternalProducts = true;
			} else if (arg.equals(CREATE_SCENARIO_PRODUCTS)) {
				createScenarioProducts = true;
			} else if (arg.equals(CLIProductBuilder.DISABLE_PARALLEL_SEND)) {
				parallelSend = false;
			} else if (arg.startsWith(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT)) {
				parallelSendTimeout = Long.valueOf(
						arg.replace(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT, ""));
			}
		}

		ProductCreator creator = getProductCreator();
		creator.setValidate(validate);
		if (creator instanceof EQMessageProductCreator) {
			((EQMessageProductCreator) creator)
					.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
		} else if (creator instanceof QuakemlProductCreator) {
			QuakemlProductCreator quakemlCreator = ((QuakemlProductCreator) creator);
			quakemlCreator
					.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
			quakemlCreator
					.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
		}

		if (
		// want usage, or didn't provide arguments
		(help || args.length == 0)
		// or didn't provide correct arguments
				|| (!poll && file == null)) {
			printUsage();
		}

		// run continuously
		else if (poll) {
			startup();
		}

		// send, then shutdown
		else {
			// file != null
			try {
				// send
				Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(
						file, attachContent);

				// check how send went
				int numSenders = getProductSenders().size();
				int total = sendExceptions.size();
				int successful = 0;
				int partialFailures = 0;
				int totalFailures = 0;

				Iterator<ProductId> sentIds = sendExceptions.keySet()
						.iterator();
				while (sentIds.hasNext()) {
					ProductId sentId = sentIds.next();
					if (sendExceptions.get(sentId).size() == numSenders) {
						totalFailures++;
						LOGGER.severe("Total failure sending product "
								+ sentId.toString());
					} else {
						// output built product id because it was sent at least
						// once
						System.out.println(sentId.toString());

						if (sendExceptions.get(sentId).size() == 0) {
							successful++;
						} else {
							partialFailures++;
							LOGGER.warning("Partial failure sending product "
									+ sentId.toString());
						}
					}
				}

				LOGGER.info("Generated " + total + " products: " + successful
						+ " sent, " + partialFailures + " partially sent, "
						+ totalFailures + " failed to send");

				// notify of failures using exit code
				if (totalFailures > 0) {
					System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
				}
				if (partialFailures > 0) {
					System.exit(CLIProductBuilder.EXIT_PARTIALLY_SENT);
				}
			} catch (Exception e) {
				if (e instanceof JAXBException
						&& ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
					SAXParseException spe = (SAXParseException) ((JAXBException) e)
							.getLinkedException();
					LOGGER.severe("Parse error: " + spe.getMessage()
							+ "; line=" + spe.getLineNumber() + ", column="
							+ spe.getColumnNumber());
				} else {
					LOGGER.log(Level.SEVERE, "Exception while sending", e);
				}
				System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
			}
		}

	}

	/** Usage for interface */
	public static void printUsage() {
		System.err
				.println("\nUsage:\n\n"
						+ "java -cp ProductClient.jar gov.usgs.earthquake.eids.EIDSInputWedge"
						+ " ("
						+ HELP_ARGUMENT
						+ "|"
						+ POLL_ARGUMENT
						+ "|"
						+ FILE_ARGUMENT
						+ "FILE) ["
						+ PRIVATE_KEY_ARGUMENT
						+ "KEYFILE] ["
						+ SIGNATURE_VERSION_ARGUMENT
						+ "VERSION] ["
						+ SERVERS_ARGUMENT
						+ "SERVERS] ["
						+ TEST_ARGUMENT
						+ "] ["
						+ CONNECT_TIMEOUT_ARGUMENT
						+ "TIMEOUT] ["
						+ PARSER_ARGUMENT
						+ "PARSER] ["
						+ POLLDIR_ARGUMENT
						+ "POLLDIR] ["
						+ POLL_INTERVAL_ARGUMENT
						+ "INTERVAL] ["
						+ STORAGEDIR_ARGUMENT
						+ "STORAGEDIR] ["
						+ ERRORDIR_ARGUMENT
						+ "ERRORDIR] ["
						+ ATTACH_ARGUMENT
						+ "ATTACH] ["
						+ SEND_ORIGINS_WITH_PHASES
						+ "] ["
						+ SEND_MECHANISMS_WITH_PHASES
						+ "] ["
						+ CREATE_INTERNAL_PRODUCTS
						+ "] ["
						+ CREATE_SCENARIO_PRODUCTS
						+ "] ["
						+ BINARY_FORMAT_ARGUMENT
						+ "] ["
						+ DISABLE_DEFLATE_ARGUMENT
						+ "] ["
						+ CLIProductBuilder.DISABLE_PARALLEL_SEND
						+ "] ["
						+ CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT
						+ "300]");

		System.err.println();

		System.err.println("\t" + HELP_ARGUMENT);
		System.err.println("\t\tdisplay this message");
		System.err.println("\t" + FILE_ARGUMENT + "FILE");
		System.err.println("\t\tparse and send one file");
		System.err.println("\t" + POLL_ARGUMENT);
		System.err.println("\t\trun continuously, checking POLLDIR for files");

		System.err.println();

		System.err.println("\t" + PRIVATE_KEY_ARGUMENT + "KEYFILE");
		System.err.println("\t\topenssh private key used to sign products");
		System.err.println("\t" + SIGNATURE_VERSION_ARGUMENT + "VERSION");
		System.err.println("\t\t'v1' is default, 'v2' is other option.");

		System.err.println("\t" + CONNECT_TIMEOUT_ARGUMENT + "TIMEOUT");
		System.err.println("\t\tmilliseconds before timeout while connecting");
		System.err.println("\t\tdefault is \"" + DEFAULT_CONNECT_TIMEOUT
				+ "\"ms");
		System.err.println("\t\t(must appear before " + SERVERS_ARGUMENT + ")");

		System.err.println("\t" + SERVERS_ARGUMENT + "SERVERS");
		System.err
				.println("\t\tcomma delimited list of servers(host:port) where products are sent");
		System.err.println("\t\tdefault is \"" + SERVERS_DEFAULT + "\"");
		System.err.println("\t" + TEST_ARGUMENT);
		System.err.println("\t\tPrint generated products to console for testing, ignores "
				+ SERVERS_ARGUMENT);
		System.err.println("\t" + PARSER_ARGUMENT + "PARSER");
		System.err.println("\t\tclass that implements "
				+ "gov.usgs.earthquake.quakeml.FileToQuakemlConverter");
		System.err.println("\t\tdefault is \"" + DEFAULT_PARSER_CLASS + "\"");

		System.err.println();

		System.err.println("\t" + POLLDIR_ARGUMENT + "POLLDIR");
		System.err.println("\t\tdirectory to poll for messages");

		System.err.println("\t" + POLL_INTERVAL_ARGUMENT + "INTERVAL");
		System.err.println("\t\tmilliseconds between polling");
		System.err.println("\t\tdefault is \"" + DEFAULT_POLLINTERVAL + "\"ms");

		System.err.println("\t" + STORAGEDIR_ARGUMENT + "STORAGEDIR");
		System.err.println("\t\tdirectory for files that were processed");

		System.err.println("\t" + ERRORDIR_ARGUMENT + "ERRORDIR");
		System.err.println("\t\tdirectory for files that weren't processed");

		System.err.println("\t" + ATTACH_ARGUMENT + "ATTACH");
		System.err
				.println("\t\tattach a file or directory to one generated product, repeatable");
		System.err
				.println("\t\tdirectory trees are preserved, each path must be unique");
		System.err
				.println("\t\tif more than one product is generated, an exception will be thrown");
		System.err.println("\t" + SEND_ORIGINS_WITH_PHASES);
		System.err
				.println("\t\tWhen a phase-data product is generated, also send an origin product without the phase data");
		System.err.println("\t" + SEND_MECHANISMS_WITH_PHASES);
		System.err
				.println("\t\tWhen an phase-data product is generated, also send focal mechanism products without the phase data");
		System.err.println();

		System.err.println("\t" + CREATE_INTERNAL_PRODUCTS);
		System.err
				.println("\t\tuse the product type prefix 'internal-' for all generated products");
		System.err.println("\t" + CREATE_SCENARIO_PRODUCTS);
		System.err
				.println("\t\tuse the product type suffix '-scenario' for all generated products");

		System.err.println("\t" + BINARY_FORMAT_ARGUMENT);
		System.err.println("\t\tsend to hub using binary format");

		System.err.println("\t" + DISABLE_DEFLATE_ARGUMENT);
		System.err.println("\t\tdisable deflate compression when sending to hubs");

		System.err.println("\t" + CLIProductBuilder.DISABLE_PARALLEL_SEND);
		System.err.println("\t\tsend to servers sequentially");

		System.err.println("\t" + CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT);
		System.err.println("\t\ttimeout for parallel sends in seconds");

		System.exit(1);
	}

	@Override
	public void run() {
		if (!polldir.exists()) {
			polldir.mkdirs();
		}
		while (!Thread.currentThread().isInterrupted()) {
			try {
				Date pollStart = new Date();

				String[] polldirFiles = polldir.list();
				if (polldirFiles.length > 0) {
					LOGGER.fine("Polldir contains " + polldirFiles.length
							+ " files");
				}

				for (int i = 0, len = polldirFiles.length; i < len; i++) {
					File file = new File(polldir, polldirFiles[i]);
					try {
						if (pollCarefully) {
							// wait until file is at least pollInterval ms old,
							// in case it is still being written
							long age = new Date().getTime() - file.lastModified();
							if (age <= pollInterval) {
								continue;
							}
						}

						onFile(file);

						if (storagedir != null) {
							if (!storagedir.exists()) {
								storagedir.mkdirs();
							}
							file.renameTo(new File(storagedir, pollStart
									.getTime() + "_" + file.getName()));
						} else {
							FileUtils.deleteTree(file);
						}
					} catch (Exception e) {
						if (errordir != null) {
							if (!errordir.exists()) {
								errordir.mkdirs();
							}
							file.renameTo(new File(errordir, pollStart
									.getTime() + "_" + file.getName()));
						} else {
							LOGGER.warning("Error processing file "
									+ file.getName()
									+ ", and no error directory configured");
							FileUtils.deleteTree(file);
						}
					}
				}

				Date pollEnd = new Date();
				Long pollTime = pollEnd.getTime() - pollStart.getTime();
				if (pollTime < pollInterval) {
					Thread.sleep(pollInterval - pollTime);
				}
			} catch (InterruptedException ie) {
				// interrupted means shutdown
				return;
			}
		}

	}

}