EIDSInputWedge.java

  1. package gov.usgs.earthquake.eids;

  2. import java.io.File;
  3. import java.net.URL;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.HashMap;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.logging.Level;
  11. import java.util.logging.Logger;

  12. import javax.xml.bind.JAXBException;

  13. import org.xml.sax.SAXParseException;

  14. import gov.usgs.earthquake.distribution.Bootstrappable;
  15. import gov.usgs.earthquake.distribution.CLIProductBuilder;
  16. import gov.usgs.earthquake.distribution.ConfigurationException;
  17. import gov.usgs.earthquake.distribution.ProductBuilder;
  18. import gov.usgs.earthquake.distribution.ProductSender;
  19. import gov.usgs.earthquake.distribution.SocketProductSender;
  20. import gov.usgs.earthquake.product.Content;
  21. import gov.usgs.earthquake.product.FileContent;
  22. import gov.usgs.earthquake.product.Product;
  23. import gov.usgs.earthquake.product.ProductId;
  24. import gov.usgs.earthquake.quakeml.FileToQuakemlConverter;
  25. import gov.usgs.util.Config;
  26. import gov.usgs.util.CryptoUtils;
  27. import gov.usgs.util.DirectoryPoller;
  28. import gov.usgs.util.FileUtils;
  29. import gov.usgs.util.StringUtils;
  30. import gov.usgs.util.CryptoUtils.Version;

  31. /**
  32.  * Read messages from files or a poll directory, and push products into PDL.
  33.  *
  34.  * This is supports EIDS/QDDS style polling. The input messages are converted to
  35.  * Quakeml using the FileToQuakemlConverter interface, then sent as Quakeml
  36.  * based products.
  37.  *
  38.  * Much of the configuration can be supplied using either a configuration file,
  39.  * or command line arguments.
  40.  */
  41. public class EIDSInputWedge extends ProductBuilder implements Runnable,
  42.         Bootstrappable {

  43.     /**
  44.      * Global reference to arguments array, when EIDSInputWedge is run via
  45.      * Bootstrap.
  46.      */
  47.     public static String[] ARGS = null;

  48.     private static final Logger LOGGER = Logger.getLogger(EIDSInputWedge.class
  49.             .getName());

  50.     /** Property for parser class */
  51.     public static final String PARSER_CLASS_PROPERTY = "parserClass";
  52.     /** Default parser class */
  53.     public static final String DEFAULT_PARSER_CLASS = "gov.usgs.earthquake.event.QuakemlToQuakemlConverter";

  54.     /** Property for polldir */
  55.     public static final String POLLDIR_PROPERTY = "directory";
  56.     /** Default polldir */
  57.     public static final String DEFAULT_POLLDIR = "polldir";
  58.     private File polldir = new File(DEFAULT_POLLDIR);

  59.     /** Property for storage directory */
  60.     public static final String STORAGEDIR_PROPERTY = "oldinputdir";
  61.     /** Default storage directory */
  62.     public static final String DEFAULT_STORAGEDIR = "oldinput";
  63.     private File storagedir = new File(DEFAULT_STORAGEDIR);

  64.     /** Property for error directory */
  65.     public static final String ERRORDIR_PROPERTY = "errordir";
  66.     /** Default error directory */
  67.     public static final String DEFAULT_ERRORDIR = "errordir";
  68.     private File errordir = new File(DEFAULT_ERRORDIR);

  69.     /** Property for validate */
  70.     public static final String VALIDATE_PROPERTY = "validate";
  71.     /** Default status of validate */
  72.     public static final String DEFAULT_VALIDATE = "false";

  73.     /** Property for sendOriginWhenPhasesExist */
  74.     public static final String SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY = "sendOriginWhenPhasesExist";
  75.     /** Default status of sendOrigin...  */
  76.     public static final String DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST = "false";

  77.     /** Property for sendMechanismWhenPhasesExist */
  78.     public static final String SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY = "sendMechanismWhenPhasesExist";
  79.     /** Default status of sendMechanism... */
  80.     public static final String DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST = "false";

  81.     /** Convert parsed quakeml to a product. */
  82.     private ProductCreator productCreator = new QuakemlProductCreator();

  83.     /** Whether created products should be converted to internal types. */
  84.     public static final String CREATE_INTERNAL_PRODUCTS_PROPERTY = "createInternalProducts";
  85.     /** Default status of CREATE_INTERNAL_PRODUCTS */
  86.     public static final String DEFAULT_CREATE_INTERNAL_PRODUCTS = "false";
  87.     private boolean createInternalProducts = false;

  88.     /** Whether created products should be converted to scenario types. */
  89.     public static final String CREATE_SCENARIO_PRODUCTS_PROPERTY = "createScenarioProducts";
  90.     /** Default status of CREATE_SCENARIO_PRODUCTS */
  91.     public static final String DEFAULT_CREATE_SCENARIO_PRODUCTS = "false";
  92.     private boolean createScenarioProducts = false;

  93.     /** Directory polling object. */
  94.     private DirectoryPoller directoryPoller;

  95.     /** Poll interval property */
  96.     public static final String POLLINTERVAL_PROPERTY = "interval";
  97.     /** Default interval for POLLINTERVAL */
  98.     public static final String DEFAULT_POLLINTERVAL = "1000";
  99.     private long pollInterval = 1000L;

  100.     /** Property for pollCarefully */
  101.     public static final String POLL_CAREFULLY_PROPERTY = "pollCarefully";
  102.     /** Default status of POLL_CAREFULLY */
  103.     public static final String DEFAULT_POLL_CAREFULLY = "false";
  104.     private boolean pollCarefully = false;

  105.     /** Property for doBufferFix */
  106.     public static final String DO_BUFFER_FIX_PROPERTY = "doBufferFix";
  107.     /** Default status of DO_BUFFER_FIX property */
  108.     public static final String DEFAULT_DO_BUFFER_FIX = "true";
  109.     private boolean doBufferFix = true;

  110.     private Thread pollThread = null;

  111.     /**
  112.      * Empty constructor
  113.      * @throws Exception if error occurs
  114.      */
  115.     public EIDSInputWedge() throws Exception {
  116.     }

  117.     /**
  118.      * Gets products from file and iterates through each product
  119.      * During iteration, sets type to internal/scenario if createInternalProducts
  120.      * or createScenarioProducts is true. Attaches Content files to product,
  121.      * Sends product
  122.      * @param file File containing products
  123.      * @param attachContent Map of String and Content
  124.      * @return Map of product IDs and sent products
  125.      * @throws Exception if error occurs
  126.      */
  127.     public Map<ProductId, Map<ProductSender, Exception>> parseAndSend(
  128.             final File file, final Map<String, Content> attachContent)
  129.             throws Exception {

  130.         Map<ProductId, Map<ProductSender, Exception>> sendProductResults = new HashMap<ProductId, Map<ProductSender, Exception>>();

  131.         List<Product> products = productCreator.getProducts(file);

  132.         Iterator<Product> iter = products.iterator();
  133.         while (iter.hasNext()) {
  134.             Product product = iter.next();
  135.             ProductId id = product.getId();

  136.             if (createInternalProducts) {
  137.                 id.setType("internal-" + id.getType());
  138.             }
  139.             if (createScenarioProducts) {
  140.                 id.setType(id.getType() + "-scenario");
  141.             }

  142.             // attach files to generated product
  143.             if (attachContent != null && attachContent.size() > 0) {
  144.                 if (products.size() > 1) {
  145.                     throw new Exception("Trying to attach files,"
  146.                             + " generated more than 1 product");
  147.                 }
  148.                 product.getContents().putAll(attachContent);
  149.             }

  150.             // send product, save any exceptions
  151.             sendProductResults.put(product.getId(), sendProduct(product));
  152.         }

  153.         return sendProductResults;
  154.     }

  155.     /**
  156.      * Parses given file, looking for send exceptions and reports statistics
  157.      * @param file to parse and look for errors
  158.      */
  159.     public void onFile(File file) {
  160.         Date inputtime = new Date();
  161.         LOGGER.info("Reading file " + file.getName());

  162.         try {
  163.             Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(
  164.                     file, null);

  165.             // check how send went
  166.             int numSenders = getProductSenders().size();
  167.             int total = sendExceptions.size();
  168.             int successful = 0;
  169.             int partialFailures = 0;
  170.             int totalFailures = 0;

  171.             Iterator<ProductId> sentIds = sendExceptions.keySet().iterator();
  172.             while (sentIds.hasNext()) {
  173.                 ProductId sentId = sentIds.next();
  174.                 if (sendExceptions.get(sentId).size() == numSenders) {
  175.                     totalFailures++;
  176.                     LOGGER.severe("Total failure sending product "
  177.                             + sentId.toString());
  178.                 } else {
  179.                     // output built product id because it was sent at least once
  180.                     System.out.println(sentId.toString());

  181.                     if (sendExceptions.get(sentId).size() == 0) {
  182.                         successful++;
  183.                     } else {
  184.                         partialFailures++;
  185.                         LOGGER.warning("Partial failure sending product "
  186.                                 + sentId.toString());
  187.                     }
  188.                 }
  189.             }

  190.             LOGGER.info("generated " + total + " products: " + successful
  191.                     + " sent, " + partialFailures + " partially sent, "
  192.                     + totalFailures + " failed to send");

  193.             // notify of failures using exit code
  194.             if (totalFailures > 0) {
  195.                 // consider this failure, event if some products sent
  196.                 throw new Exception();
  197.             }

  198.             // succeeded, at least somewhat
  199.             // move to oldinput directory
  200.             file.renameTo(new File(storagedir, file.getName() + "_"
  201.                     + inputtime.getTime()));
  202.         } catch (Exception e) {
  203.             if (e instanceof JAXBException
  204.                     && ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
  205.                 SAXParseException spe = (SAXParseException) ((JAXBException) e)
  206.                         .getLinkedException();
  207.                 LOGGER.warning("Parse error: " + spe.getMessage() + "; line="
  208.                         + spe.getLineNumber() + ", column="
  209.                         + spe.getColumnNumber());
  210.             }

  211.             if (errordir != null) {
  212.                 if (!errordir.exists()) {
  213.                     errordir.mkdirs();
  214.                 }
  215.                 File errorfile = new File(errordir, file.getName() + "_"
  216.                         + new Date().getTime());

  217.                 // move to error directory
  218.                 try {
  219.                 LOGGER.log(Level.WARNING, "Errors processing file, moving to "
  220.                         + errorfile.getCanonicalPath(), e);
  221.                 } catch (Exception ignore) {
  222.                     //ignore
  223.                 }
  224.                 file.renameTo(errorfile);
  225.             } else {
  226.                 LOGGER.warning("Error processing file " + file.getName()
  227.                         + ", and no error directory configured");
  228.                 FileUtils.deleteTree(file);
  229.             }
  230.         }
  231.     }

  232.     @Override
  233.     public void configure(Config config) throws Exception {
  234.         super.configure(config);

  235.         String parserClassName = config.getProperty(PARSER_CLASS_PROPERTY);
  236.         if (parserClassName == null) {
  237.             LOGGER.config("Using QuakemlToProductConverter");
  238.         } else {
  239.             Object parserObj = Class.forName(parserClassName)
  240.                     .getConstructor().newInstance();
  241.             if (parserObj instanceof ProductCreator) {
  242.                 productCreator = (ProductCreator) parserObj;
  243.             } else if (parserObj instanceof FileToQuakemlConverter) {
  244.                 QuakemlProductCreator quakemlCreator = new QuakemlProductCreator(doBufferFix);
  245.                 quakemlCreator.setConverter((FileToQuakemlConverter) parserObj);
  246.                 productCreator = quakemlCreator;
  247.             } else {
  248.                 throw new ConfigurationException("configured parser class "
  249.                         + parserClassName + " does not implement "
  250.                         + FileToQuakemlConverter.class.getName());
  251.             }
  252.             LOGGER.config("Using parser class " + parserClassName);
  253.         }

  254.         boolean validate = Boolean.getBoolean(config.getProperty(
  255.                 VALIDATE_PROPERTY, DEFAULT_VALIDATE));
  256.         productCreator.setValidate(validate);
  257.         LOGGER.config("Validation " + (validate ? "enabled" : "disabled"));

  258.         boolean sendOriginWhenPhasesExist = Boolean.valueOf(config
  259.                 .getProperty(SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY,
  260.                         DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST));
  261.         if (productCreator instanceof EQMessageProductCreator) {
  262.             ((EQMessageProductCreator) productCreator)
  263.                     .setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
  264.         } else {
  265.             ((QuakemlProductCreator) productCreator)
  266.                     .setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
  267.         }
  268.         LOGGER.config("sendOriginWhenPhasesExist = "
  269.                 + sendOriginWhenPhasesExist);

  270.         boolean sendMechanismWhenPhasesExist = Boolean.valueOf(config
  271.                 .getProperty(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY,
  272.                         DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST));
  273.         if (productCreator instanceof EQMessageProductCreator) {
  274.             if (sendMechanismWhenPhasesExist) {
  275.                 LOGGER.warning(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY
  276.                         + " is not supported for EQMessageProductCreator");
  277.             }
  278.         } else {
  279.             ((QuakemlProductCreator) productCreator)
  280.                     .setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
  281.         }
  282.         LOGGER.config("sendMechanismWhenPhasesExist = "
  283.                 + sendMechanismWhenPhasesExist);

  284.         polldir = new File(
  285.                 config.getProperty(POLLDIR_PROPERTY, DEFAULT_POLLDIR));
  286.         LOGGER.config("Using poll directory " + polldir.getCanonicalPath());

  287.         pollInterval = Long.valueOf(config.getProperty(POLLINTERVAL_PROPERTY,
  288.                 DEFAULT_POLLINTERVAL));
  289.         LOGGER.config("Using poll interval " + pollInterval + "ms");

  290.         pollCarefully = Boolean.valueOf(config.getProperty(POLL_CAREFULLY_PROPERTY,
  291.                 DEFAULT_POLL_CAREFULLY));
  292.         LOGGER.config("Poll carefully = " + pollCarefully);

  293.         storagedir = new File(config.getProperty(STORAGEDIR_PROPERTY,
  294.                 DEFAULT_STORAGEDIR));
  295.         LOGGER.config("Using oldinput directory "
  296.                 + storagedir.getCanonicalPath());

  297.         errordir = new File(config.getProperty(ERRORDIR_PROPERTY,
  298.                 DEFAULT_ERRORDIR));
  299.         LOGGER.config("Using error directory " + errordir.getCanonicalPath());

  300.         createInternalProducts = Boolean.valueOf(config
  301.                 .getProperty(CREATE_INTERNAL_PRODUCTS_PROPERTY,
  302.                         DEFAULT_CREATE_INTERNAL_PRODUCTS));
  303.         LOGGER.config("createInternalProducts = " + createInternalProducts);

  304.         createScenarioProducts = Boolean.valueOf(config
  305.                 .getProperty(CREATE_SCENARIO_PRODUCTS_PROPERTY,
  306.                         DEFAULT_CREATE_SCENARIO_PRODUCTS));
  307.         LOGGER.config("createScenarioProducts = " + createScenarioProducts);

  308.         doBufferFix = Boolean.valueOf(config
  309.                 .getProperty(DO_BUFFER_FIX_PROPERTY,
  310.                                 DEFAULT_DO_BUFFER_FIX));
  311.         LOGGER.config("doBufferFix = " + doBufferFix);
  312.     }

  313.     @Override
  314.     public void shutdown() throws Exception {
  315.         if (pollThread != null) {
  316.             pollThread.interrupt();
  317.             pollThread = null;
  318.         }

  319.         super.shutdown();
  320.     }

  321.     @Override
  322.     public void startup() throws Exception {
  323.         super.startup();

  324.         if (pollThread == null) {
  325.             pollThread = new Thread(this);
  326.             pollThread.setName("poll thread");
  327.             pollThread.start();
  328.         }
  329.     }

  330.     /** @return polldir */
  331.     public File getPolldir() {
  332.         return polldir;
  333.     }

  334.     /** @param polldir File to set */
  335.     public void setPolldir(File polldir) {
  336.         this.polldir = polldir;
  337.     }

  338.     /** @return storagedir */
  339.     public File getStoragedir() {
  340.         return storagedir;
  341.     }

  342.     /** @param storagedir File to set */
  343.     public void setStoragedir(File storagedir) {
  344.         this.storagedir = storagedir;
  345.     }

  346.     /** @return errordir */
  347.     public File getErrordir() {
  348.         return errordir;
  349.     }

  350.     /** @param errordir File to send */
  351.     public void setErrordir(File errordir) {
  352.         this.errordir = errordir;
  353.     }

  354.     /** @return productCreator */
  355.     public ProductCreator getProductCreator() {
  356.         return productCreator;
  357.     }

  358.     /** @param productCreator to set */
  359.     public void setProductCreator(ProductCreator productCreator) {
  360.         this.productCreator = productCreator;
  361.     }

  362.     /** @return directoryPoller */
  363.     public DirectoryPoller getDirectoryPoller() {
  364.         return directoryPoller;
  365.     }

  366.     /** @param directoryPoller to set */
  367.     public void setDirectoryPoller(DirectoryPoller directoryPoller) {
  368.         this.directoryPoller = directoryPoller;
  369.     }

  370.     /** @return pollInterval long */
  371.     public long getPollInterval() {
  372.         return pollInterval;
  373.     }

  374.     /** @param pollInterval long to set */
  375.     public void setPollInterval(long pollInterval) {
  376.         this.pollInterval = pollInterval;
  377.     }

  378.     /** @return pollCarefully boolean */
  379.     public boolean isPollCarefully() {
  380.         return pollCarefully;
  381.     }

  382.     /** @param pollCarefully boolean to set */
  383.     public void setPollCarefully(boolean pollCarefully) {
  384.         this.pollCarefully = pollCarefully;
  385.     }

  386.     /**
  387.      * @return the createInternalProducts
  388.      */
  389.     public boolean isCreateInternalProducts() {
  390.         return createInternalProducts;
  391.     }

  392.     /**
  393.      * @param createInternalProducts
  394.      *            the createInternalProducts to set
  395.      */
  396.     public void setCreateInternalProducts(boolean createInternalProducts) {
  397.         this.createInternalProducts = createInternalProducts;
  398.     }

  399.     /**
  400.      * @return the createScenarioProducts
  401.      */
  402.     public boolean isCreateScenarioProducts() {
  403.         return createScenarioProducts;
  404.     }

  405.     /**
  406.      * @param createScenarioProducts
  407.      *            the createScenarioProducts to set
  408.      */
  409.     public void setCreateScenarioProducts(boolean createScenarioProducts) {
  410.         this.createScenarioProducts = createScenarioProducts;
  411.     }

  412.     /**
  413.      * Parses a string of servers into SocketProductSenders, all put into a list
  414.      * of product senders
  415.      * @param servers String of servers, split by commas
  416.      * @param connectTimeout int timeout
  417.      * @param binaryFormat boolean if binary format
  418.      * @param enableDeflate boolean if Deflate should be enabled
  419.      * @return List of product senders
  420.      */
  421.     public static List<ProductSender> parseServers(final String servers,
  422.             final Integer connectTimeout, final boolean binaryFormat,
  423.             final boolean enableDeflate) {
  424.         List<ProductSender> senders = new ArrayList<ProductSender>();

  425.         Iterator<String> iter = StringUtils.split(servers, ",").iterator();
  426.         while (iter.hasNext()) {
  427.             String server = iter.next();
  428.             String[] parts = server.split(":");
  429.             SocketProductSender sender = new SocketProductSender(parts[0],
  430.                     Integer.parseInt(parts[1]), connectTimeout);
  431.             sender.setBinaryFormat(binaryFormat);
  432.             sender.setEnableDeflate(enableDeflate);
  433.             senders.add(sender);
  434.         }

  435.         return senders;
  436.     }

  437.     /** Argument for help */
  438.     public static final String HELP_ARGUMENT = "--help";
  439.     /** Argument for poll */
  440.     public static final String POLL_ARGUMENT = "--poll";

  441.     /** Argument for poleCarefully */
  442.     public static final String POLL_CAREFULLY_ARGUMENT = "--pollCarefully";
  443.     /** Argument for polldir */
  444.     public static final String POLLDIR_ARGUMENT = "--polldir=";
  445.     /** Argument for errordir */
  446.     public static final String ERRORDIR_ARGUMENT = "--errordir=";
  447.     /** Argument for storagedir */
  448.     public static final String STORAGEDIR_ARGUMENT = "--oldinputdir=";
  449.     /** Argument for poll interval */
  450.     public static final String POLL_INTERVAL_ARGUMENT = "--pollInterval=";
  451.     /** Argument for tracker url */
  452.     public static final String TRACKER_URL_ARGUMENT = "--trackerURL=";

  453.     /** Argument for file */
  454.     public static final String FILE_ARGUMENT = "--file=";

  455.     /** Argument for parser */
  456.     public static final String PARSER_ARGUMENT = "--parser=";
  457.     /** Argument for validate */
  458.     public static final String VALIDATE_ARGUMENT = "--validate";
  459.     /** Argument for privateKey */
  460.     public static final String PRIVATE_KEY_ARGUMENT = "--privateKey=";
  461.     /** Argument for signatureVersion */
  462.     public static final String SIGNATURE_VERSION_ARGUMENT = "--signatureVersion=";

  463.     /** Argument for servers */
  464.     public static final String SERVERS_ARGUMENT = "--servers=";
  465.     /** Default server for server argument */
  466.     public static final String SERVERS_DEFAULT = "prod01-pdl01.cr.usgs.gov:11235,prod02-pdl01.cr.usgs.gov:11235";
  467.     /** Argument for connection timeout */
  468.     public static final String CONNECT_TIMEOUT_ARGUMENT = "--connectTimeout=";
  469.     /** Default timeout for connection */
  470.     public static final Integer DEFAULT_CONNECT_TIMEOUT = 15000;
  471.     /** Argument for binaryFormat */
  472.     public static final String BINARY_FORMAT_ARGUMENT = "--binaryFormat";
  473.     /** Argument for disableDeflate */
  474.     public static final String DISABLE_DEFLATE_ARGUMENT = "--disableDeflate";
  475.     /** Argument for attach */
  476.     public static final String ATTACH_ARGUMENT = "--attach=";
  477.     /** Argument for sending origin with phases */
  478.     public static final String SEND_ORIGINS_WITH_PHASES = "--sendOriginWhenPhasesExist";
  479.     /** Argument for sending mechanisms with phases */
  480.     public static final String SEND_MECHANISMS_WITH_PHASES = "--sendMechanismWhenPhasesExist";

  481.     /** Argument for creating internal products */
  482.     public static final String CREATE_INTERNAL_PRODUCTS = "--internal";
  483.     /** Argument for creating scenario products */
  484.     public static final String CREATE_SCENARIO_PRODUCTS = "--scenario";

  485.     /** Argument for testing */
  486.     public static final String TEST_ARGUMENT = "--test";

  487.     /**
  488.      * Bootstrappable interface.
  489.      */
  490.     @Override
  491.     public void run(final String[] args) throws Exception {
  492.         // save arguments in global for access by FileToQuakemlParser objects.
  493.         EIDSInputWedge.ARGS = args;

  494.         boolean test = false;
  495.         boolean help = false;
  496.         boolean poll = false;
  497.         boolean validate = this.productCreator.isValidate();
  498.         boolean sendOriginWhenPhasesExist = false;
  499.         boolean sendMechanismWhenPhasesExist = false;

  500.         // preserve any existing settings from config file
  501.         if (productCreator instanceof QuakemlProductCreator) {
  502.             sendOriginWhenPhasesExist = ((QuakemlProductCreator) productCreator)
  503.                     .isSendOriginWhenPhasesExist();
  504.             sendMechanismWhenPhasesExist = ((QuakemlProductCreator) productCreator)
  505.                     .isSendMechanismWhenPhasesExist();
  506.         } else if (productCreator instanceof EQMessageProductCreator) {
  507.             sendOriginWhenPhasesExist = ((EQMessageProductCreator) productCreator)
  508.                     .isSendOriginWhenPhasesExist();
  509.         }

  510.         File file = null;
  511.         // when sending 1 product, allow extra files to be attached.
  512.         HashMap<String, Content> attachContent = new HashMap<String, Content>();
  513.         Integer connectTimeout = DEFAULT_CONNECT_TIMEOUT;
  514.         boolean binaryFormat = false;
  515.         boolean enableDeflate = true;

  516.         StringBuffer arguments = new StringBuffer();
  517.         for (String arg : args) {
  518.             arguments.append(arg).append(" ");
  519.             if (arg.equals(HELP_ARGUMENT)) {
  520.                 help = true;
  521.             } else if (arg.equals(POLL_ARGUMENT)) {
  522.                 poll = true;
  523.             } else if (arg.equals(POLL_CAREFULLY_ARGUMENT)) {
  524.                 pollCarefully = true;
  525.             } else if (arg.equals(SEND_ORIGINS_WITH_PHASES)) {
  526.                 sendOriginWhenPhasesExist = true;
  527.             } else if (arg.equals(SEND_MECHANISMS_WITH_PHASES)) {
  528.                 sendMechanismWhenPhasesExist = true;
  529.             } else if (arg.startsWith(POLLDIR_ARGUMENT)) {
  530.                 setPolldir(new File(arg.replace(POLLDIR_ARGUMENT, "")));
  531.             } else if (arg.startsWith(ERRORDIR_ARGUMENT)) {
  532.                 setErrordir(new File(arg.replace(ERRORDIR_ARGUMENT, "")));
  533.             } else if (arg.startsWith(STORAGEDIR_ARGUMENT)) {
  534.                 setStoragedir(new File(arg.replace(STORAGEDIR_ARGUMENT, "")));
  535.             } else if (arg.startsWith(FILE_ARGUMENT)) {
  536.                 file = new File(arg.replace(FILE_ARGUMENT, ""));
  537.             } else if (arg.startsWith(ATTACH_ARGUMENT)) {
  538.                 File attach = new File(arg.replace(ATTACH_ARGUMENT, ""));
  539.                 if (attach.isDirectory()) {
  540.                     attachContent.putAll(FileContent
  541.                             .getDirectoryContents(attach));
  542.                 } else {
  543.                     attachContent
  544.                             .put(attach.getName(), new FileContent(attach));
  545.                 }
  546.             } else if (arg.startsWith(PARSER_ARGUMENT)) {
  547.                 Object parser = Class.forName(arg.replace(PARSER_ARGUMENT, ""))
  548.                         .getConstructor().newInstance();
  549.                 if (parser instanceof ProductCreator) {
  550.                     setProductCreator((ProductCreator) parser);
  551.                 } else {
  552.                     QuakemlProductCreator productCreator = new QuakemlProductCreator();
  553.                     productCreator
  554.                             .setConverter((FileToQuakemlConverter) parser);
  555.                     setProductCreator(productCreator);
  556.                 }
  557.             } else if (arg.startsWith(VALIDATE_ARGUMENT)) {
  558.                 validate = true;
  559.             } else if (arg.startsWith(SERVERS_ARGUMENT)) {
  560.                 // ignore servers argument when in test mode
  561.                 if (!test) {
  562.                     getProductSenders().clear();
  563.                     getProductSenders().addAll(
  564.                             parseServers(arg.replace(SERVERS_ARGUMENT, ""),
  565.                                     connectTimeout, binaryFormat, enableDeflate));
  566.                 }
  567.             } else if (arg.startsWith(TEST_ARGUMENT)) {
  568.                 test = true;
  569.                 getProductSenders().clear();
  570.                 getProductSenders().add(new DebugProductSender());
  571.             } else if (arg.startsWith(PRIVATE_KEY_ARGUMENT)) {
  572.                 setPrivateKey(CryptoUtils.readOpenSSHPrivateKey(FileUtils
  573.                         .readFile(new File(arg
  574.                                 .replace(PRIVATE_KEY_ARGUMENT, ""))), null));
  575.             } else if (arg.startsWith(SIGNATURE_VERSION_ARGUMENT)) {
  576.                 setSignatureVersion(Version.fromString(
  577.                         arg.replace(SIGNATURE_VERSION_ARGUMENT, "")));
  578.             } else if (arg.startsWith(CONNECT_TIMEOUT_ARGUMENT)) {
  579.                 connectTimeout = Integer.valueOf(arg.replace(
  580.                         CONNECT_TIMEOUT_ARGUMENT, ""));
  581.             } else if (arg.equals(BINARY_FORMAT_ARGUMENT)) {
  582.                 binaryFormat = true;
  583.             } else if (arg.equals(DISABLE_DEFLATE_ARGUMENT)) {
  584.                 enableDeflate = false;
  585.             } else if (arg.startsWith(POLL_INTERVAL_ARGUMENT)) {
  586.                 setPollInterval(Long.valueOf(arg.replace(
  587.                         POLL_INTERVAL_ARGUMENT, "")));
  588.             } else if (arg.startsWith(TRACKER_URL_ARGUMENT)) {
  589.                 this.setTrackerURL(new URL(arg
  590.                         .replace(TRACKER_URL_ARGUMENT, "")));
  591.             } else if (arg.equals(CREATE_INTERNAL_PRODUCTS)) {
  592.                 createInternalProducts = true;
  593.             } else if (arg.equals(CREATE_SCENARIO_PRODUCTS)) {
  594.                 createScenarioProducts = true;
  595.             } else if (arg.equals(CLIProductBuilder.DISABLE_PARALLEL_SEND)) {
  596.                 parallelSend = false;
  597.             } else if (arg.startsWith(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT)) {
  598.                 parallelSendTimeout = Long.valueOf(
  599.                         arg.replace(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT, ""));
  600.             }
  601.         }

  602.         ProductCreator creator = getProductCreator();
  603.         creator.setValidate(validate);
  604.         if (creator instanceof EQMessageProductCreator) {
  605.             ((EQMessageProductCreator) creator)
  606.                     .setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
  607.         } else if (creator instanceof QuakemlProductCreator) {
  608.             QuakemlProductCreator quakemlCreator = ((QuakemlProductCreator) creator);
  609.             quakemlCreator
  610.                     .setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
  611.             quakemlCreator
  612.                     .setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
  613.         }

  614.         if (
  615.         // want usage, or didn't provide arguments
  616.         (help || args.length == 0)
  617.         // or didn't provide correct arguments
  618.                 || (!poll && file == null)) {
  619.             printUsage();
  620.         }

  621.         // run continuously
  622.         else if (poll) {
  623.             startup();
  624.         }

  625.         // send, then shutdown
  626.         else {
  627.             // file != null
  628.             try {
  629.                 // send
  630.                 Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(
  631.                         file, attachContent);

  632.                 // check how send went
  633.                 int numSenders = getProductSenders().size();
  634.                 int total = sendExceptions.size();
  635.                 int successful = 0;
  636.                 int partialFailures = 0;
  637.                 int totalFailures = 0;

  638.                 Iterator<ProductId> sentIds = sendExceptions.keySet()
  639.                         .iterator();
  640.                 while (sentIds.hasNext()) {
  641.                     ProductId sentId = sentIds.next();
  642.                     if (sendExceptions.get(sentId).size() == numSenders) {
  643.                         totalFailures++;
  644.                         LOGGER.severe("Total failure sending product "
  645.                                 + sentId.toString());
  646.                     } else {
  647.                         // output built product id because it was sent at least
  648.                         // once
  649.                         System.out.println(sentId.toString());

  650.                         if (sendExceptions.get(sentId).size() == 0) {
  651.                             successful++;
  652.                         } else {
  653.                             partialFailures++;
  654.                             LOGGER.warning("Partial failure sending product "
  655.                                     + sentId.toString());
  656.                         }
  657.                     }
  658.                 }

  659.                 LOGGER.info("Generated " + total + " products: " + successful
  660.                         + " sent, " + partialFailures + " partially sent, "
  661.                         + totalFailures + " failed to send");

  662.                 // notify of failures using exit code
  663.                 if (totalFailures > 0) {
  664.                     System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
  665.                 }
  666.                 if (partialFailures > 0) {
  667.                     System.exit(CLIProductBuilder.EXIT_PARTIALLY_SENT);
  668.                 }
  669.             } catch (Exception e) {
  670.                 if (e instanceof JAXBException
  671.                         && ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
  672.                     SAXParseException spe = (SAXParseException) ((JAXBException) e)
  673.                             .getLinkedException();
  674.                     LOGGER.severe("Parse error: " + spe.getMessage()
  675.                             + "; line=" + spe.getLineNumber() + ", column="
  676.                             + spe.getColumnNumber());
  677.                 } else {
  678.                     LOGGER.log(Level.SEVERE, "Exception while sending", e);
  679.                 }
  680.                 System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
  681.             }
  682.         }

  683.     }

  684.     /** Usage for interface */
  685.     public static void printUsage() {
  686.         System.err
  687.                 .println("\nUsage:\n\n"
  688.                         + "java -cp ProductClient.jar gov.usgs.earthquake.eids.EIDSInputWedge"
  689.                         + " ("
  690.                         + HELP_ARGUMENT
  691.                         + "|"
  692.                         + POLL_ARGUMENT
  693.                         + "|"
  694.                         + FILE_ARGUMENT
  695.                         + "FILE) ["
  696.                         + PRIVATE_KEY_ARGUMENT
  697.                         + "KEYFILE] ["
  698.                         + SIGNATURE_VERSION_ARGUMENT
  699.                         + "VERSION] ["
  700.                         + SERVERS_ARGUMENT
  701.                         + "SERVERS] ["
  702.                         + TEST_ARGUMENT
  703.                         + "] ["
  704.                         + CONNECT_TIMEOUT_ARGUMENT
  705.                         + "TIMEOUT] ["
  706.                         + PARSER_ARGUMENT
  707.                         + "PARSER] ["
  708.                         + POLLDIR_ARGUMENT
  709.                         + "POLLDIR] ["
  710.                         + POLL_INTERVAL_ARGUMENT
  711.                         + "INTERVAL] ["
  712.                         + STORAGEDIR_ARGUMENT
  713.                         + "STORAGEDIR] ["
  714.                         + ERRORDIR_ARGUMENT
  715.                         + "ERRORDIR] ["
  716.                         + ATTACH_ARGUMENT
  717.                         + "ATTACH] ["
  718.                         + SEND_ORIGINS_WITH_PHASES
  719.                         + "] ["
  720.                         + SEND_MECHANISMS_WITH_PHASES
  721.                         + "] ["
  722.                         + CREATE_INTERNAL_PRODUCTS
  723.                         + "] ["
  724.                         + CREATE_SCENARIO_PRODUCTS
  725.                         + "] ["
  726.                         + BINARY_FORMAT_ARGUMENT
  727.                         + "] ["
  728.                         + DISABLE_DEFLATE_ARGUMENT
  729.                         + "] ["
  730.                         + CLIProductBuilder.DISABLE_PARALLEL_SEND
  731.                         + "] ["
  732.                         + CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT
  733.                         + "300]");

  734.         System.err.println();

  735.         System.err.println("\t" + HELP_ARGUMENT);
  736.         System.err.println("\t\tdisplay this message");
  737.         System.err.println("\t" + FILE_ARGUMENT + "FILE");
  738.         System.err.println("\t\tparse and send one file");
  739.         System.err.println("\t" + POLL_ARGUMENT);
  740.         System.err.println("\t\trun continuously, checking POLLDIR for files");

  741.         System.err.println();

  742.         System.err.println("\t" + PRIVATE_KEY_ARGUMENT + "KEYFILE");
  743.         System.err.println("\t\topenssh private key used to sign products");
  744.         System.err.println("\t" + SIGNATURE_VERSION_ARGUMENT + "VERSION");
  745.         System.err.println("\t\t'v1' is default, 'v2' is other option.");

  746.         System.err.println("\t" + CONNECT_TIMEOUT_ARGUMENT + "TIMEOUT");
  747.         System.err.println("\t\tmilliseconds before timeout while connecting");
  748.         System.err.println("\t\tdefault is \"" + DEFAULT_CONNECT_TIMEOUT
  749.                 + "\"ms");
  750.         System.err.println("\t\t(must appear before " + SERVERS_ARGUMENT + ")");

  751.         System.err.println("\t" + SERVERS_ARGUMENT + "SERVERS");
  752.         System.err
  753.                 .println("\t\tcomma delimited list of servers(host:port) where products are sent");
  754.         System.err.println("\t\tdefault is \"" + SERVERS_DEFAULT + "\"");
  755.         System.err.println("\t" + TEST_ARGUMENT);
  756.         System.err.println("\t\tPrint generated products to console for testing, ignores "
  757.                 + SERVERS_ARGUMENT);
  758.         System.err.println("\t" + PARSER_ARGUMENT + "PARSER");
  759.         System.err.println("\t\tclass that implements "
  760.                 + "gov.usgs.earthquake.quakeml.FileToQuakemlConverter");
  761.         System.err.println("\t\tdefault is \"" + DEFAULT_PARSER_CLASS + "\"");

  762.         System.err.println();

  763.         System.err.println("\t" + POLLDIR_ARGUMENT + "POLLDIR");
  764.         System.err.println("\t\tdirectory to poll for messages");

  765.         System.err.println("\t" + POLL_INTERVAL_ARGUMENT + "INTERVAL");
  766.         System.err.println("\t\tmilliseconds between polling");
  767.         System.err.println("\t\tdefault is \"" + DEFAULT_POLLINTERVAL + "\"ms");

  768.         System.err.println("\t" + STORAGEDIR_ARGUMENT + "STORAGEDIR");
  769.         System.err.println("\t\tdirectory for files that were processed");

  770.         System.err.println("\t" + ERRORDIR_ARGUMENT + "ERRORDIR");
  771.         System.err.println("\t\tdirectory for files that weren't processed");

  772.         System.err.println("\t" + ATTACH_ARGUMENT + "ATTACH");
  773.         System.err
  774.                 .println("\t\tattach a file or directory to one generated product, repeatable");
  775.         System.err
  776.                 .println("\t\tdirectory trees are preserved, each path must be unique");
  777.         System.err
  778.                 .println("\t\tif more than one product is generated, an exception will be thrown");
  779.         System.err.println("\t" + SEND_ORIGINS_WITH_PHASES);
  780.         System.err
  781.                 .println("\t\tWhen a phase-data product is generated, also send an origin product without the phase data");
  782.         System.err.println("\t" + SEND_MECHANISMS_WITH_PHASES);
  783.         System.err
  784.                 .println("\t\tWhen an phase-data product is generated, also send focal mechanism products without the phase data");
  785.         System.err.println();

  786.         System.err.println("\t" + CREATE_INTERNAL_PRODUCTS);
  787.         System.err
  788.                 .println("\t\tuse the product type prefix 'internal-' for all generated products");
  789.         System.err.println("\t" + CREATE_SCENARIO_PRODUCTS);
  790.         System.err
  791.                 .println("\t\tuse the product type suffix '-scenario' for all generated products");

  792.         System.err.println("\t" + BINARY_FORMAT_ARGUMENT);
  793.         System.err.println("\t\tsend to hub using binary format");

  794.         System.err.println("\t" + DISABLE_DEFLATE_ARGUMENT);
  795.         System.err.println("\t\tdisable deflate compression when sending to hubs");

  796.         System.err.println("\t" + CLIProductBuilder.DISABLE_PARALLEL_SEND);
  797.         System.err.println("\t\tsend to servers sequentially");

  798.         System.err.println("\t" + CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT);
  799.         System.err.println("\t\ttimeout for parallel sends in seconds");

  800.         System.exit(1);
  801.     }

  802.     @Override
  803.     public void run() {
  804.         if (!polldir.exists()) {
  805.             polldir.mkdirs();
  806.         }
  807.         while (!Thread.currentThread().isInterrupted()) {
  808.             try {
  809.                 Date pollStart = new Date();

  810.                 String[] polldirFiles = polldir.list();
  811.                 if (polldirFiles.length > 0) {
  812.                     LOGGER.fine("Polldir contains " + polldirFiles.length
  813.                             + " files");
  814.                 }

  815.                 for (int i = 0, len = polldirFiles.length; i < len; i++) {
  816.                     File file = new File(polldir, polldirFiles[i]);
  817.                     try {
  818.                         if (pollCarefully) {
  819.                             // wait until file is at least pollInterval ms old,
  820.                             // in case it is still being written
  821.                             long age = new Date().getTime() - file.lastModified();
  822.                             if (age <= pollInterval) {
  823.                                 continue;
  824.                             }
  825.                         }

  826.                         onFile(file);

  827.                         if (storagedir != null) {
  828.                             if (!storagedir.exists()) {
  829.                                 storagedir.mkdirs();
  830.                             }
  831.                             file.renameTo(new File(storagedir, pollStart
  832.                                     .getTime() + "_" + file.getName()));
  833.                         } else {
  834.                             FileUtils.deleteTree(file);
  835.                         }
  836.                     } catch (Exception e) {
  837.                         if (errordir != null) {
  838.                             if (!errordir.exists()) {
  839.                                 errordir.mkdirs();
  840.                             }
  841.                             file.renameTo(new File(errordir, pollStart
  842.                                     .getTime() + "_" + file.getName()));
  843.                         } else {
  844.                             LOGGER.warning("Error processing file "
  845.                                     + file.getName()
  846.                                     + ", and no error directory configured");
  847.                             FileUtils.deleteTree(file);
  848.                         }
  849.                     }
  850.                 }

  851.                 Date pollEnd = new Date();
  852.                 Long pollTime = pollEnd.getTime() - pollStart.getTime();
  853.                 if (pollTime < pollInterval) {
  854.                     Thread.sleep(pollInterval - pollTime);
  855.                 }
  856.             } catch (InterruptedException ie) {
  857.                 // interrupted means shutdown
  858.                 return;
  859.             }
  860.         }

  861.     }

  862. }