EIDSInputWedge.java
package gov.usgs.earthquake.eids;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.bind.JAXBException;
import org.xml.sax.SAXParseException;
import gov.usgs.earthquake.distribution.Bootstrappable;
import gov.usgs.earthquake.distribution.CLIProductBuilder;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.ProductBuilder;
import gov.usgs.earthquake.distribution.ProductSender;
import gov.usgs.earthquake.distribution.SocketProductSender;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.FileContent;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.quakeml.FileToQuakemlConverter;
import gov.usgs.util.Config;
import gov.usgs.util.CryptoUtils;
import gov.usgs.util.DirectoryPoller;
import gov.usgs.util.FileUtils;
import gov.usgs.util.StringUtils;
import gov.usgs.util.CryptoUtils.Version;
/**
* Read messages from files or a poll directory, and push products into PDL.
*
* This is supports EIDS/QDDS style polling. The input messages are converted to
* Quakeml using the FileToQuakemlConverter interface, then sent as Quakeml
* based products.
*
* Much of the configuration can be supplied using either a configuration file,
* or command line arguments.
*/
public class EIDSInputWedge extends ProductBuilder implements Runnable,
Bootstrappable {
/**
* Global reference to arguments array, when EIDSInputWedge is run via
* Bootstrap.
*/
public static String[] ARGS = null;
private static final Logger LOGGER = Logger.getLogger(EIDSInputWedge.class
.getName());
/** Property for parser class */
public static final String PARSER_CLASS_PROPERTY = "parserClass";
/** Default parser class */
public static final String DEFAULT_PARSER_CLASS = "gov.usgs.earthquake.event.QuakemlToQuakemlConverter";
/** Property for polldir */
public static final String POLLDIR_PROPERTY = "directory";
/** Default polldir */
public static final String DEFAULT_POLLDIR = "polldir";
private File polldir = new File(DEFAULT_POLLDIR);
/** Property for storage directory */
public static final String STORAGEDIR_PROPERTY = "oldinputdir";
/** Default storage directory */
public static final String DEFAULT_STORAGEDIR = "oldinput";
private File storagedir = new File(DEFAULT_STORAGEDIR);
/** Property for error directory */
public static final String ERRORDIR_PROPERTY = "errordir";
/** Default error directory */
public static final String DEFAULT_ERRORDIR = "errordir";
private File errordir = new File(DEFAULT_ERRORDIR);
/** Property for validate */
public static final String VALIDATE_PROPERTY = "validate";
/** Default status of validate */
public static final String DEFAULT_VALIDATE = "false";
/** Property for sendOriginWhenPhasesExist */
public static final String SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY = "sendOriginWhenPhasesExist";
/** Default status of sendOrigin... */
public static final String DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST = "false";
/** Property for sendMechanismWhenPhasesExist */
public static final String SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY = "sendMechanismWhenPhasesExist";
/** Default status of sendMechanism... */
public static final String DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST = "false";
/** Convert parsed quakeml to a product. */
private ProductCreator productCreator = new QuakemlProductCreator();
/** Whether created products should be converted to internal types. */
public static final String CREATE_INTERNAL_PRODUCTS_PROPERTY = "createInternalProducts";
/** Default status of CREATE_INTERNAL_PRODUCTS */
public static final String DEFAULT_CREATE_INTERNAL_PRODUCTS = "false";
private boolean createInternalProducts = false;
/** Whether created products should be converted to scenario types. */
public static final String CREATE_SCENARIO_PRODUCTS_PROPERTY = "createScenarioProducts";
/** Default status of CREATE_SCENARIO_PRODUCTS */
public static final String DEFAULT_CREATE_SCENARIO_PRODUCTS = "false";
private boolean createScenarioProducts = false;
/** Directory polling object. */
private DirectoryPoller directoryPoller;
/** Poll interval property */
public static final String POLLINTERVAL_PROPERTY = "interval";
/** Default interval for POLLINTERVAL */
public static final String DEFAULT_POLLINTERVAL = "1000";
private long pollInterval = 1000L;
/** Property for pollCarefully */
public static final String POLL_CAREFULLY_PROPERTY = "pollCarefully";
/** Default status of POLL_CAREFULLY */
public static final String DEFAULT_POLL_CAREFULLY = "false";
private boolean pollCarefully = false;
/** Property for doBufferFix */
public static final String DO_BUFFER_FIX_PROPERTY = "doBufferFix";
/** Default status of DO_BUFFER_FIX property */
public static final String DEFAULT_DO_BUFFER_FIX = "true";
private boolean doBufferFix = true;
private Thread pollThread = null;
/**
* Empty constructor
* @throws Exception if error occurs
*/
public EIDSInputWedge() throws Exception {
}
/**
* Gets products from file and iterates through each product
* During iteration, sets type to internal/scenario if createInternalProducts
* or createScenarioProducts is true. Attaches Content files to product,
* Sends product
* @param file File containing products
* @param attachContent Map of String and Content
* @return Map of product IDs and sent products
* @throws Exception if error occurs
*/
public Map<ProductId, Map<ProductSender, Exception>> parseAndSend(
final File file, final Map<String, Content> attachContent)
throws Exception {
Map<ProductId, Map<ProductSender, Exception>> sendProductResults = new HashMap<ProductId, Map<ProductSender, Exception>>();
List<Product> products = productCreator.getProducts(file);
Iterator<Product> iter = products.iterator();
while (iter.hasNext()) {
Product product = iter.next();
ProductId id = product.getId();
if (createInternalProducts) {
id.setType("internal-" + id.getType());
}
if (createScenarioProducts) {
id.setType(id.getType() + "-scenario");
}
// attach files to generated product
if (attachContent != null && attachContent.size() > 0) {
if (products.size() > 1) {
throw new Exception("Trying to attach files,"
+ " generated more than 1 product");
}
product.getContents().putAll(attachContent);
}
// send product, save any exceptions
sendProductResults.put(product.getId(), sendProduct(product));
}
return sendProductResults;
}
/**
* Parses given file, looking for send exceptions and reports statistics
* @param file to parse and look for errors
*/
public void onFile(File file) {
Date inputtime = new Date();
LOGGER.info("Reading file " + file.getName());
try {
Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(
file, null);
// check how send went
int numSenders = getProductSenders().size();
int total = sendExceptions.size();
int successful = 0;
int partialFailures = 0;
int totalFailures = 0;
Iterator<ProductId> sentIds = sendExceptions.keySet().iterator();
while (sentIds.hasNext()) {
ProductId sentId = sentIds.next();
if (sendExceptions.get(sentId).size() == numSenders) {
totalFailures++;
LOGGER.severe("Total failure sending product "
+ sentId.toString());
} else {
// output built product id because it was sent at least once
System.out.println(sentId.toString());
if (sendExceptions.get(sentId).size() == 0) {
successful++;
} else {
partialFailures++;
LOGGER.warning("Partial failure sending product "
+ sentId.toString());
}
}
}
LOGGER.info("generated " + total + " products: " + successful
+ " sent, " + partialFailures + " partially sent, "
+ totalFailures + " failed to send");
// notify of failures using exit code
if (totalFailures > 0) {
// consider this failure, event if some products sent
throw new Exception();
}
// succeeded, at least somewhat
// move to oldinput directory
file.renameTo(new File(storagedir, file.getName() + "_"
+ inputtime.getTime()));
} catch (Exception e) {
if (e instanceof JAXBException
&& ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
SAXParseException spe = (SAXParseException) ((JAXBException) e)
.getLinkedException();
LOGGER.warning("Parse error: " + spe.getMessage() + "; line="
+ spe.getLineNumber() + ", column="
+ spe.getColumnNumber());
}
if (errordir != null) {
if (!errordir.exists()) {
errordir.mkdirs();
}
File errorfile = new File(errordir, file.getName() + "_"
+ new Date().getTime());
// move to error directory
try {
LOGGER.log(Level.WARNING, "Errors processing file, moving to "
+ errorfile.getCanonicalPath(), e);
} catch (Exception ignore) {
//ignore
}
file.renameTo(errorfile);
} else {
LOGGER.warning("Error processing file " + file.getName()
+ ", and no error directory configured");
FileUtils.deleteTree(file);
}
}
}
@Override
public void configure(Config config) throws Exception {
super.configure(config);
String parserClassName = config.getProperty(PARSER_CLASS_PROPERTY);
if (parserClassName == null) {
LOGGER.config("Using QuakemlToProductConverter");
} else {
Object parserObj = Class.forName(parserClassName)
.getConstructor().newInstance();
if (parserObj instanceof ProductCreator) {
productCreator = (ProductCreator) parserObj;
} else if (parserObj instanceof FileToQuakemlConverter) {
QuakemlProductCreator quakemlCreator = new QuakemlProductCreator(doBufferFix);
quakemlCreator.setConverter((FileToQuakemlConverter) parserObj);
productCreator = quakemlCreator;
} else {
throw new ConfigurationException("configured parser class "
+ parserClassName + " does not implement "
+ FileToQuakemlConverter.class.getName());
}
LOGGER.config("Using parser class " + parserClassName);
}
boolean validate = Boolean.getBoolean(config.getProperty(
VALIDATE_PROPERTY, DEFAULT_VALIDATE));
productCreator.setValidate(validate);
LOGGER.config("Validation " + (validate ? "enabled" : "disabled"));
boolean sendOriginWhenPhasesExist = Boolean.valueOf(config
.getProperty(SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY,
DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST));
if (productCreator instanceof EQMessageProductCreator) {
((EQMessageProductCreator) productCreator)
.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
} else {
((QuakemlProductCreator) productCreator)
.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
}
LOGGER.config("sendOriginWhenPhasesExist = "
+ sendOriginWhenPhasesExist);
boolean sendMechanismWhenPhasesExist = Boolean.valueOf(config
.getProperty(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY,
DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST));
if (productCreator instanceof EQMessageProductCreator) {
if (sendMechanismWhenPhasesExist) {
LOGGER.warning(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY
+ " is not supported for EQMessageProductCreator");
}
} else {
((QuakemlProductCreator) productCreator)
.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
}
LOGGER.config("sendMechanismWhenPhasesExist = "
+ sendMechanismWhenPhasesExist);
polldir = new File(
config.getProperty(POLLDIR_PROPERTY, DEFAULT_POLLDIR));
LOGGER.config("Using poll directory " + polldir.getCanonicalPath());
pollInterval = Long.valueOf(config.getProperty(POLLINTERVAL_PROPERTY,
DEFAULT_POLLINTERVAL));
LOGGER.config("Using poll interval " + pollInterval + "ms");
pollCarefully = Boolean.valueOf(config.getProperty(POLL_CAREFULLY_PROPERTY,
DEFAULT_POLL_CAREFULLY));
LOGGER.config("Poll carefully = " + pollCarefully);
storagedir = new File(config.getProperty(STORAGEDIR_PROPERTY,
DEFAULT_STORAGEDIR));
LOGGER.config("Using oldinput directory "
+ storagedir.getCanonicalPath());
errordir = new File(config.getProperty(ERRORDIR_PROPERTY,
DEFAULT_ERRORDIR));
LOGGER.config("Using error directory " + errordir.getCanonicalPath());
createInternalProducts = Boolean.valueOf(config
.getProperty(CREATE_INTERNAL_PRODUCTS_PROPERTY,
DEFAULT_CREATE_INTERNAL_PRODUCTS));
LOGGER.config("createInternalProducts = " + createInternalProducts);
createScenarioProducts = Boolean.valueOf(config
.getProperty(CREATE_SCENARIO_PRODUCTS_PROPERTY,
DEFAULT_CREATE_SCENARIO_PRODUCTS));
LOGGER.config("createScenarioProducts = " + createScenarioProducts);
doBufferFix = Boolean.valueOf(config
.getProperty(DO_BUFFER_FIX_PROPERTY,
DEFAULT_DO_BUFFER_FIX));
LOGGER.config("doBufferFix = " + doBufferFix);
}
@Override
public void shutdown() throws Exception {
if (pollThread != null) {
pollThread.interrupt();
pollThread = null;
}
super.shutdown();
}
@Override
public void startup() throws Exception {
super.startup();
if (pollThread == null) {
pollThread = new Thread(this);
pollThread.setName("poll thread");
pollThread.start();
}
}
/** @return polldir */
public File getPolldir() {
return polldir;
}
/** @param polldir File to set */
public void setPolldir(File polldir) {
this.polldir = polldir;
}
/** @return storagedir */
public File getStoragedir() {
return storagedir;
}
/** @param storagedir File to set */
public void setStoragedir(File storagedir) {
this.storagedir = storagedir;
}
/** @return errordir */
public File getErrordir() {
return errordir;
}
/** @param errordir File to send */
public void setErrordir(File errordir) {
this.errordir = errordir;
}
/** @return productCreator */
public ProductCreator getProductCreator() {
return productCreator;
}
/** @param productCreator to set */
public void setProductCreator(ProductCreator productCreator) {
this.productCreator = productCreator;
}
/** @return directoryPoller */
public DirectoryPoller getDirectoryPoller() {
return directoryPoller;
}
/** @param directoryPoller to set */
public void setDirectoryPoller(DirectoryPoller directoryPoller) {
this.directoryPoller = directoryPoller;
}
/** @return pollInterval long */
public long getPollInterval() {
return pollInterval;
}
/** @param pollInterval long to set */
public void setPollInterval(long pollInterval) {
this.pollInterval = pollInterval;
}
/** @return pollCarefully boolean */
public boolean isPollCarefully() {
return pollCarefully;
}
/** @param pollCarefully boolean to set */
public void setPollCarefully(boolean pollCarefully) {
this.pollCarefully = pollCarefully;
}
/**
* @return the createInternalProducts
*/
public boolean isCreateInternalProducts() {
return createInternalProducts;
}
/**
* @param createInternalProducts
* the createInternalProducts to set
*/
public void setCreateInternalProducts(boolean createInternalProducts) {
this.createInternalProducts = createInternalProducts;
}
/**
* @return the createScenarioProducts
*/
public boolean isCreateScenarioProducts() {
return createScenarioProducts;
}
/**
* @param createScenarioProducts
* the createScenarioProducts to set
*/
public void setCreateScenarioProducts(boolean createScenarioProducts) {
this.createScenarioProducts = createScenarioProducts;
}
/**
* Parses a string of servers into SocketProductSenders, all put into a list
* of product senders
* @param servers String of servers, split by commas
* @param connectTimeout int timeout
* @param binaryFormat boolean if binary format
* @param enableDeflate boolean if Deflate should be enabled
* @return List of product senders
*/
public static List<ProductSender> parseServers(final String servers,
final Integer connectTimeout, final boolean binaryFormat,
final boolean enableDeflate) {
List<ProductSender> senders = new ArrayList<ProductSender>();
Iterator<String> iter = StringUtils.split(servers, ",").iterator();
while (iter.hasNext()) {
String server = iter.next();
String[] parts = server.split(":");
SocketProductSender sender = new SocketProductSender(parts[0],
Integer.parseInt(parts[1]), connectTimeout);
sender.setBinaryFormat(binaryFormat);
sender.setEnableDeflate(enableDeflate);
senders.add(sender);
}
return senders;
}
/** Argument for help */
public static final String HELP_ARGUMENT = "--help";
/** Argument for poll */
public static final String POLL_ARGUMENT = "--poll";
/** Argument for poleCarefully */
public static final String POLL_CAREFULLY_ARGUMENT = "--pollCarefully";
/** Argument for polldir */
public static final String POLLDIR_ARGUMENT = "--polldir=";
/** Argument for errordir */
public static final String ERRORDIR_ARGUMENT = "--errordir=";
/** Argument for storagedir */
public static final String STORAGEDIR_ARGUMENT = "--oldinputdir=";
/** Argument for poll interval */
public static final String POLL_INTERVAL_ARGUMENT = "--pollInterval=";
/** Argument for tracker url */
public static final String TRACKER_URL_ARGUMENT = "--trackerURL=";
/** Argument for file */
public static final String FILE_ARGUMENT = "--file=";
/** Argument for parser */
public static final String PARSER_ARGUMENT = "--parser=";
/** Argument for validate */
public static final String VALIDATE_ARGUMENT = "--validate";
/** Argument for privateKey */
public static final String PRIVATE_KEY_ARGUMENT = "--privateKey=";
/** Argument for signatureVersion */
public static final String SIGNATURE_VERSION_ARGUMENT = "--signatureVersion=";
/** Argument for servers */
public static final String SERVERS_ARGUMENT = "--servers=";
/** Default server for server argument */
public static final String SERVERS_DEFAULT = "prod01-pdl01.cr.usgs.gov:11235,prod02-pdl01.cr.usgs.gov:11235";
/** Argument for connection timeout */
public static final String CONNECT_TIMEOUT_ARGUMENT = "--connectTimeout=";
/** Default timeout for connection */
public static final Integer DEFAULT_CONNECT_TIMEOUT = 15000;
/** Argument for binaryFormat */
public static final String BINARY_FORMAT_ARGUMENT = "--binaryFormat";
/** Argument for disableDeflate */
public static final String DISABLE_DEFLATE_ARGUMENT = "--disableDeflate";
/** Argument for attach */
public static final String ATTACH_ARGUMENT = "--attach=";
/** Argument for sending origin with phases */
public static final String SEND_ORIGINS_WITH_PHASES = "--sendOriginWhenPhasesExist";
/** Argument for sending mechanisms with phases */
public static final String SEND_MECHANISMS_WITH_PHASES = "--sendMechanismWhenPhasesExist";
/** Argument for creating internal products */
public static final String CREATE_INTERNAL_PRODUCTS = "--internal";
/** Argument for creating scenario products */
public static final String CREATE_SCENARIO_PRODUCTS = "--scenario";
/** Argument for testing */
public static final String TEST_ARGUMENT = "--test";
/**
* Bootstrappable interface.
*/
@Override
public void run(final String[] args) throws Exception {
// save arguments in global for access by FileToQuakemlParser objects.
EIDSInputWedge.ARGS = args;
boolean test = false;
boolean help = false;
boolean poll = false;
boolean validate = this.productCreator.isValidate();
boolean sendOriginWhenPhasesExist = false;
boolean sendMechanismWhenPhasesExist = false;
// preserve any existing settings from config file
if (productCreator instanceof QuakemlProductCreator) {
sendOriginWhenPhasesExist = ((QuakemlProductCreator) productCreator)
.isSendOriginWhenPhasesExist();
sendMechanismWhenPhasesExist = ((QuakemlProductCreator) productCreator)
.isSendMechanismWhenPhasesExist();
} else if (productCreator instanceof EQMessageProductCreator) {
sendOriginWhenPhasesExist = ((EQMessageProductCreator) productCreator)
.isSendOriginWhenPhasesExist();
}
File file = null;
// when sending 1 product, allow extra files to be attached.
HashMap<String, Content> attachContent = new HashMap<String, Content>();
Integer connectTimeout = DEFAULT_CONNECT_TIMEOUT;
boolean binaryFormat = false;
boolean enableDeflate = true;
StringBuffer arguments = new StringBuffer();
for (String arg : args) {
arguments.append(arg).append(" ");
if (arg.equals(HELP_ARGUMENT)) {
help = true;
} else if (arg.equals(POLL_ARGUMENT)) {
poll = true;
} else if (arg.equals(POLL_CAREFULLY_ARGUMENT)) {
pollCarefully = true;
} else if (arg.equals(SEND_ORIGINS_WITH_PHASES)) {
sendOriginWhenPhasesExist = true;
} else if (arg.equals(SEND_MECHANISMS_WITH_PHASES)) {
sendMechanismWhenPhasesExist = true;
} else if (arg.startsWith(POLLDIR_ARGUMENT)) {
setPolldir(new File(arg.replace(POLLDIR_ARGUMENT, "")));
} else if (arg.startsWith(ERRORDIR_ARGUMENT)) {
setErrordir(new File(arg.replace(ERRORDIR_ARGUMENT, "")));
} else if (arg.startsWith(STORAGEDIR_ARGUMENT)) {
setStoragedir(new File(arg.replace(STORAGEDIR_ARGUMENT, "")));
} else if (arg.startsWith(FILE_ARGUMENT)) {
file = new File(arg.replace(FILE_ARGUMENT, ""));
} else if (arg.startsWith(ATTACH_ARGUMENT)) {
File attach = new File(arg.replace(ATTACH_ARGUMENT, ""));
if (attach.isDirectory()) {
attachContent.putAll(FileContent
.getDirectoryContents(attach));
} else {
attachContent
.put(attach.getName(), new FileContent(attach));
}
} else if (arg.startsWith(PARSER_ARGUMENT)) {
Object parser = Class.forName(arg.replace(PARSER_ARGUMENT, ""))
.getConstructor().newInstance();
if (parser instanceof ProductCreator) {
setProductCreator((ProductCreator) parser);
} else {
QuakemlProductCreator productCreator = new QuakemlProductCreator();
productCreator
.setConverter((FileToQuakemlConverter) parser);
setProductCreator(productCreator);
}
} else if (arg.startsWith(VALIDATE_ARGUMENT)) {
validate = true;
} else if (arg.startsWith(SERVERS_ARGUMENT)) {
// ignore servers argument when in test mode
if (!test) {
getProductSenders().clear();
getProductSenders().addAll(
parseServers(arg.replace(SERVERS_ARGUMENT, ""),
connectTimeout, binaryFormat, enableDeflate));
}
} else if (arg.startsWith(TEST_ARGUMENT)) {
test = true;
getProductSenders().clear();
getProductSenders().add(new DebugProductSender());
} else if (arg.startsWith(PRIVATE_KEY_ARGUMENT)) {
setPrivateKey(CryptoUtils.readOpenSSHPrivateKey(FileUtils
.readFile(new File(arg
.replace(PRIVATE_KEY_ARGUMENT, ""))), null));
} else if (arg.startsWith(SIGNATURE_VERSION_ARGUMENT)) {
setSignatureVersion(Version.fromString(
arg.replace(SIGNATURE_VERSION_ARGUMENT, "")));
} else if (arg.startsWith(CONNECT_TIMEOUT_ARGUMENT)) {
connectTimeout = Integer.valueOf(arg.replace(
CONNECT_TIMEOUT_ARGUMENT, ""));
} else if (arg.equals(BINARY_FORMAT_ARGUMENT)) {
binaryFormat = true;
} else if (arg.equals(DISABLE_DEFLATE_ARGUMENT)) {
enableDeflate = false;
} else if (arg.startsWith(POLL_INTERVAL_ARGUMENT)) {
setPollInterval(Long.valueOf(arg.replace(
POLL_INTERVAL_ARGUMENT, "")));
} else if (arg.startsWith(TRACKER_URL_ARGUMENT)) {
this.setTrackerURL(new URL(arg
.replace(TRACKER_URL_ARGUMENT, "")));
} else if (arg.equals(CREATE_INTERNAL_PRODUCTS)) {
createInternalProducts = true;
} else if (arg.equals(CREATE_SCENARIO_PRODUCTS)) {
createScenarioProducts = true;
} else if (arg.equals(CLIProductBuilder.DISABLE_PARALLEL_SEND)) {
parallelSend = false;
} else if (arg.startsWith(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT)) {
parallelSendTimeout = Long.valueOf(
arg.replace(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT, ""));
}
}
ProductCreator creator = getProductCreator();
creator.setValidate(validate);
if (creator instanceof EQMessageProductCreator) {
((EQMessageProductCreator) creator)
.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
} else if (creator instanceof QuakemlProductCreator) {
QuakemlProductCreator quakemlCreator = ((QuakemlProductCreator) creator);
quakemlCreator
.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
quakemlCreator
.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
}
if (
// want usage, or didn't provide arguments
(help || args.length == 0)
// or didn't provide correct arguments
|| (!poll && file == null)) {
printUsage();
}
// run continuously
else if (poll) {
startup();
}
// send, then shutdown
else {
// file != null
try {
// send
Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(
file, attachContent);
// check how send went
int numSenders = getProductSenders().size();
int total = sendExceptions.size();
int successful = 0;
int partialFailures = 0;
int totalFailures = 0;
Iterator<ProductId> sentIds = sendExceptions.keySet()
.iterator();
while (sentIds.hasNext()) {
ProductId sentId = sentIds.next();
if (sendExceptions.get(sentId).size() == numSenders) {
totalFailures++;
LOGGER.severe("Total failure sending product "
+ sentId.toString());
} else {
// output built product id because it was sent at least
// once
System.out.println(sentId.toString());
if (sendExceptions.get(sentId).size() == 0) {
successful++;
} else {
partialFailures++;
LOGGER.warning("Partial failure sending product "
+ sentId.toString());
}
}
}
LOGGER.info("Generated " + total + " products: " + successful
+ " sent, " + partialFailures + " partially sent, "
+ totalFailures + " failed to send");
// notify of failures using exit code
if (totalFailures > 0) {
System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
}
if (partialFailures > 0) {
System.exit(CLIProductBuilder.EXIT_PARTIALLY_SENT);
}
} catch (Exception e) {
if (e instanceof JAXBException
&& ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
SAXParseException spe = (SAXParseException) ((JAXBException) e)
.getLinkedException();
LOGGER.severe("Parse error: " + spe.getMessage()
+ "; line=" + spe.getLineNumber() + ", column="
+ spe.getColumnNumber());
} else {
LOGGER.log(Level.SEVERE, "Exception while sending", e);
}
System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
}
}
}
/** Usage for interface */
public static void printUsage() {
System.err
.println("\nUsage:\n\n"
+ "java -cp ProductClient.jar gov.usgs.earthquake.eids.EIDSInputWedge"
+ " ("
+ HELP_ARGUMENT
+ "|"
+ POLL_ARGUMENT
+ "|"
+ FILE_ARGUMENT
+ "FILE) ["
+ PRIVATE_KEY_ARGUMENT
+ "KEYFILE] ["
+ SIGNATURE_VERSION_ARGUMENT
+ "VERSION] ["
+ SERVERS_ARGUMENT
+ "SERVERS] ["
+ TEST_ARGUMENT
+ "] ["
+ CONNECT_TIMEOUT_ARGUMENT
+ "TIMEOUT] ["
+ PARSER_ARGUMENT
+ "PARSER] ["
+ POLLDIR_ARGUMENT
+ "POLLDIR] ["
+ POLL_INTERVAL_ARGUMENT
+ "INTERVAL] ["
+ STORAGEDIR_ARGUMENT
+ "STORAGEDIR] ["
+ ERRORDIR_ARGUMENT
+ "ERRORDIR] ["
+ ATTACH_ARGUMENT
+ "ATTACH] ["
+ SEND_ORIGINS_WITH_PHASES
+ "] ["
+ SEND_MECHANISMS_WITH_PHASES
+ "] ["
+ CREATE_INTERNAL_PRODUCTS
+ "] ["
+ CREATE_SCENARIO_PRODUCTS
+ "] ["
+ BINARY_FORMAT_ARGUMENT
+ "] ["
+ DISABLE_DEFLATE_ARGUMENT
+ "] ["
+ CLIProductBuilder.DISABLE_PARALLEL_SEND
+ "] ["
+ CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT
+ "300]");
System.err.println();
System.err.println("\t" + HELP_ARGUMENT);
System.err.println("\t\tdisplay this message");
System.err.println("\t" + FILE_ARGUMENT + "FILE");
System.err.println("\t\tparse and send one file");
System.err.println("\t" + POLL_ARGUMENT);
System.err.println("\t\trun continuously, checking POLLDIR for files");
System.err.println();
System.err.println("\t" + PRIVATE_KEY_ARGUMENT + "KEYFILE");
System.err.println("\t\topenssh private key used to sign products");
System.err.println("\t" + SIGNATURE_VERSION_ARGUMENT + "VERSION");
System.err.println("\t\t'v1' is default, 'v2' is other option.");
System.err.println("\t" + CONNECT_TIMEOUT_ARGUMENT + "TIMEOUT");
System.err.println("\t\tmilliseconds before timeout while connecting");
System.err.println("\t\tdefault is \"" + DEFAULT_CONNECT_TIMEOUT
+ "\"ms");
System.err.println("\t\t(must appear before " + SERVERS_ARGUMENT + ")");
System.err.println("\t" + SERVERS_ARGUMENT + "SERVERS");
System.err
.println("\t\tcomma delimited list of servers(host:port) where products are sent");
System.err.println("\t\tdefault is \"" + SERVERS_DEFAULT + "\"");
System.err.println("\t" + TEST_ARGUMENT);
System.err.println("\t\tPrint generated products to console for testing, ignores "
+ SERVERS_ARGUMENT);
System.err.println("\t" + PARSER_ARGUMENT + "PARSER");
System.err.println("\t\tclass that implements "
+ "gov.usgs.earthquake.quakeml.FileToQuakemlConverter");
System.err.println("\t\tdefault is \"" + DEFAULT_PARSER_CLASS + "\"");
System.err.println();
System.err.println("\t" + POLLDIR_ARGUMENT + "POLLDIR");
System.err.println("\t\tdirectory to poll for messages");
System.err.println("\t" + POLL_INTERVAL_ARGUMENT + "INTERVAL");
System.err.println("\t\tmilliseconds between polling");
System.err.println("\t\tdefault is \"" + DEFAULT_POLLINTERVAL + "\"ms");
System.err.println("\t" + STORAGEDIR_ARGUMENT + "STORAGEDIR");
System.err.println("\t\tdirectory for files that were processed");
System.err.println("\t" + ERRORDIR_ARGUMENT + "ERRORDIR");
System.err.println("\t\tdirectory for files that weren't processed");
System.err.println("\t" + ATTACH_ARGUMENT + "ATTACH");
System.err
.println("\t\tattach a file or directory to one generated product, repeatable");
System.err
.println("\t\tdirectory trees are preserved, each path must be unique");
System.err
.println("\t\tif more than one product is generated, an exception will be thrown");
System.err.println("\t" + SEND_ORIGINS_WITH_PHASES);
System.err
.println("\t\tWhen a phase-data product is generated, also send an origin product without the phase data");
System.err.println("\t" + SEND_MECHANISMS_WITH_PHASES);
System.err
.println("\t\tWhen an phase-data product is generated, also send focal mechanism products without the phase data");
System.err.println();
System.err.println("\t" + CREATE_INTERNAL_PRODUCTS);
System.err
.println("\t\tuse the product type prefix 'internal-' for all generated products");
System.err.println("\t" + CREATE_SCENARIO_PRODUCTS);
System.err
.println("\t\tuse the product type suffix '-scenario' for all generated products");
System.err.println("\t" + BINARY_FORMAT_ARGUMENT);
System.err.println("\t\tsend to hub using binary format");
System.err.println("\t" + DISABLE_DEFLATE_ARGUMENT);
System.err.println("\t\tdisable deflate compression when sending to hubs");
System.err.println("\t" + CLIProductBuilder.DISABLE_PARALLEL_SEND);
System.err.println("\t\tsend to servers sequentially");
System.err.println("\t" + CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT);
System.err.println("\t\ttimeout for parallel sends in seconds");
System.exit(1);
}
@Override
public void run() {
if (!polldir.exists()) {
polldir.mkdirs();
}
while (!Thread.currentThread().isInterrupted()) {
try {
Date pollStart = new Date();
String[] polldirFiles = polldir.list();
if (polldirFiles.length > 0) {
LOGGER.fine("Polldir contains " + polldirFiles.length
+ " files");
}
for (int i = 0, len = polldirFiles.length; i < len; i++) {
File file = new File(polldir, polldirFiles[i]);
try {
if (pollCarefully) {
// wait until file is at least pollInterval ms old,
// in case it is still being written
long age = new Date().getTime() - file.lastModified();
if (age <= pollInterval) {
continue;
}
}
onFile(file);
if (storagedir != null) {
if (!storagedir.exists()) {
storagedir.mkdirs();
}
file.renameTo(new File(storagedir, pollStart
.getTime() + "_" + file.getName()));
} else {
FileUtils.deleteTree(file);
}
} catch (Exception e) {
if (errordir != null) {
if (!errordir.exists()) {
errordir.mkdirs();
}
file.renameTo(new File(errordir, pollStart
.getTime() + "_" + file.getName()));
} else {
LOGGER.warning("Error processing file "
+ file.getName()
+ ", and no error directory configured");
FileUtils.deleteTree(file);
}
}
}
Date pollEnd = new Date();
Long pollTime = pollEnd.getTime() - pollStart.getTime();
if (pollTime < pollInterval) {
Thread.sleep(pollInterval - pollTime);
}
} catch (InterruptedException ie) {
// interrupted means shutdown
return;
}
}
}
}