ProductBuilder.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.util.Config;
- import gov.usgs.util.CryptoUtils;
- import gov.usgs.util.DefaultConfigurable;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.StringUtils;
- import gov.usgs.util.CryptoUtils.Version;
- import java.io.File;
- import java.net.MalformedURLException;
- import java.net.URL;
- import java.security.PrivateKey;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Essentials for building/sending products.
- *
- * This is the base class for other builders.
- *
- * Supported configurable properties:
- * <dl>
- * <dt>senders</dt>
- * <dd>A comma delimited list of product senders to use when sending
- * products.</dd>
- * <dt>trackerURL</dt>
- * <dd>Default tracker URL to assign to products that don't already have
- * one.</dd>
- * <dt>privateKeyFile</dt>
- * <dd>Path to a private key that can be used to sign products.</dd>
- * </dl>
- */
- public class ProductBuilder extends DefaultConfigurable {
- private static final Logger LOGGER = Logger.getLogger(ProductBuilder.class.getSimpleName());
- /** Configurable property for senders. */
- public static final String SENDERS_PROPERTY = "senders";
- /** Property name used for configuring a tracker url. */
- public static final String TRACKER_URL_PROPERTY = "trackerURL";
- /** Private key filename configuration property. */
- public static final String PRIVATE_KEY_PROPERTY = "privateKeyFile";
- /** Signature version property. */
- public static final String SIGNATURE_VERSION_PROPERTY = "signatureVersion";
- /** Send in parallel. */
- public static final String PARALLEL_SEND_PROPERTY = "parallelSend";
- /** Bool for parallel send */
- public static final String DEFAULT_PARALLEL_SEND = "true";
- /** Timeout in seconds for parallel send. */
- public static final String PARALLEL_SEND_TIMEOUT_PROPERTY = "parallelSendTimeout";
- /** time in ms for parallel send timemout */
- public static final String DEFAULT_PARALLEL_SEND_TIMEOUT = "300";
- /** Default tracker url. */
- public static final URL DEFAULT_TRACKER_URL;
- static {
- URL url = null;
- try {
- url = new URL("http://ehppdl1.cr.usgs.gov/tracker/");
- } catch (MalformedURLException mue) {
- LOGGER.severe("Failed to parse default tracker url.");
- System.exit(1);
- }
- DEFAULT_TRACKER_URL = url;
- }
- /** List of senders where built products are sent. */
- private List<ProductSender> senders = new LinkedList<ProductSender>();
- /** Default trackerURL to set on sent products. */
- private URL trackerURL;
- /** Key used to sign sent products. */
- private PrivateKey privateKey;
- /** Signature version. */
- private Version signatureVersion = Version.SIGNATURE_V1;
- /** Whether to send in parallel. */
- protected boolean parallelSend = true;
- /** How long to wait before parallel send timeout. */
- protected long parallelSendTimeout = 300L;
- /** Default product builder constructor */
- public ProductBuilder() {
- trackerURL = DEFAULT_TRACKER_URL;
- }
- /**
- * Send a product.
- *
- * If the product doesn't yet have a tracker URL, assigns current tracker URL to
- * product. If the product has not yet been signed, and a privateKey is
- * configured, signs the product before sending.
- *
- * @param product the product to send.
- * @return map of all exceptions thrown, from Sender to corresponding Exception.
- * @throws Exception if an error occurs while signing product.
- */
- public Map<ProductSender, Exception> sendProduct(final Product product) throws Exception {
- // doesn't already have a tracker url
- if (product.getTrackerURL() == null) {
- product.setTrackerURL(trackerURL);
- }
- // mark which version of client was used to create product
- product.getProperties().put(ProductClient.PDL_CLIENT_VERSION_PROPERTY, ProductClient.RELEASE_VERSION);
- // doesn't already have a signature.
- if (privateKey != null && product.getSignature() == null) {
- product.sign(privateKey, signatureVersion);
- }
- // send tracker update
- new ProductTracker(product.getTrackerURL()).productCreated(this.getName(), product.getId());
- // send product using all product senders.
- if (parallelSend) {
- return parallelSendProduct(senders, product, parallelSendTimeout);
- }
- // send sequentially if not parallel
- Map<ProductSender, Exception> errors = new HashMap<ProductSender, Exception>();
- Iterator<ProductSender> iter = new LinkedList<ProductSender>(senders).iterator();
- while (iter.hasNext()) {
- ProductSender sender = iter.next();
- try {
- sender.sendProduct(product);
- } catch (Exception e) {
- if (e instanceof ProductAlreadyInStorageException) {
- // condense this message...
- LOGGER.info("Product already in storage, id=" + product.getId().toString());
- } else {
- LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
- errors.put(sender, e);
- }
- }
- }
- return errors;
- }
- /**
- * @return list of product senders
- */
- public List<ProductSender> getProductSenders() {
- return senders;
- }
- /**
- * Add a ProductSender.
- *
- * @param sender to add
- */
- public void addProductSender(final ProductSender sender) {
- senders.add(sender);
- }
- /**
- * Remove a previously added ProductSender.
- *
- * @param sender to remove
- */
- public void removeProductSender(final ProductSender sender) {
- senders.remove(sender);
- }
- /** @return trackerURL */
- public URL getTrackerURL() {
- return trackerURL;
- }
- /** @param trackerURL to set */
- public void setTrackerURL(URL trackerURL) {
- this.trackerURL = trackerURL;
- }
- /** @return privateKey */
- public PrivateKey getPrivateKey() {
- return privateKey;
- }
- /** @param privateKey to set */
- public void setPrivateKey(PrivateKey privateKey) {
- this.privateKey = privateKey;
- }
- /** @return signatureVersion */
- public Version getSignatureVersion() {
- return signatureVersion;
- }
- /** @param signatureVersion to set */
- public void setSignatureVersion(Version signatureVersion) {
- this.signatureVersion = signatureVersion;
- }
- @Override
- public void configure(final Config config) throws Exception {
- Iterator<String> senderNames = StringUtils.split(config.getProperty(SENDERS_PROPERTY), ",").iterator();
- while (senderNames.hasNext()) {
- String name = senderNames.next();
- LOGGER.config("Loading sender " + name);
- ProductSender sender = (ProductSender) Config.getConfig().getObject(name);
- if (sender == null) {
- throw new ConfigurationException("Unable to load sender '" + name + "', make sure it is properly configured.");
- }
- addProductSender(sender);
- }
- String url = config.getProperty(TRACKER_URL_PROPERTY);
- if (url != null) {
- trackerURL = new URL(url);
- }
- LOGGER.config("[" + getName() + "] Using tracker URL '" + trackerURL.toString() + "'");
- String keyFilename = config.getProperty(PRIVATE_KEY_PROPERTY);
- if (keyFilename != null) {
- LOGGER.config("[" + getName() + "] Loading private key file '" + keyFilename + "'");
- privateKey = CryptoUtils.readOpenSSHPrivateKey(StreamUtils.readStream(new File(keyFilename)), null);
- }
- String version = config.getProperty(SIGNATURE_VERSION_PROPERTY);
- if (version != null) {
- signatureVersion = Version.fromString(version);
- }
- LOGGER.config("[" + getName() + "] signature version = " + signatureVersion);
- parallelSend = Boolean.valueOf(config.getProperty(PARALLEL_SEND_PROPERTY, DEFAULT_PARALLEL_SEND));
- parallelSendTimeout = Long
- .valueOf(config.getProperty(PARALLEL_SEND_TIMEOUT_PROPERTY, DEFAULT_PARALLEL_SEND_TIMEOUT));
- LOGGER.config("[" + getName() + "] parallel send enabled=" + parallelSend + ", timeout=" + parallelSendTimeout);
- }
- @Override
- public void shutdown() throws Exception {
- Iterator<ProductSender> iter = senders.iterator();
- while (iter.hasNext()) {
- iter.next().shutdown();
- }
- }
- @Override
- public void startup() throws Exception {
- Iterator<ProductSender> iter = senders.iterator();
- while (iter.hasNext()) {
- iter.next().startup();
- }
- }
- /**
- * Send a product to all ProductSenders concurrently.
- *
- * @param senders the senders to receive product.
- * @param product the product to send.
- * @param timeoutSeconds number of seconds before timing out, interrupting any
- * pending send.
- * @return exceptions that occured while sending. If map is empty, there were no
- * exceptions.
- */
- public static Map<ProductSender, Exception> parallelSendProduct(final List<ProductSender> senders,
- final Product product, final long timeoutSeconds) {
- final Map<ProductSender, Boolean> sendComplete = Collections.synchronizedMap(new HashMap<ProductSender, Boolean>());
- final Map<ProductSender, Exception> sendExceptions = Collections
- .synchronizedMap(new HashMap<ProductSender, Exception>());
- Iterator<ProductSender> iter = senders.iterator();
- List<Callable<Void>> sendTasks = new ArrayList<Callable<Void>>();
- while (iter.hasNext()) {
- final ProductSender sender = iter.next();
- sendComplete.put(sender, false);
- sendTasks.add(() -> {
- try {
- sender.sendProduct(product);
- sendComplete.put(sender, true);
- } catch (Exception e) {
- if (e instanceof ProductAlreadyInStorageException) {
- LOGGER.info("Product already in storage, id=" + product.getId().toString());
- } else {
- LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
- sendExceptions.put(sender, e);
- }
- }
- return null;
- });
- }
- // run in parallel
- ExecutorService sendExecutor = Executors.newFixedThreadPool(senders.size());
- try {
- sendExecutor.invokeAll(sendTasks, timeoutSeconds, TimeUnit.SECONDS);
- } catch (Exception e) {
- // this may be Interupted, NullPointer, or RejectedExecution
- // in any case, this part is done and move on to checking send status
- }
- sendExecutor.shutdown();
- // check whether send completed or was interrupted
- for (ProductSender sender : sendComplete.keySet()) {
- if (!sendComplete.get(sender) && sendExceptions.get(sender) == null) {
- sendExceptions.put(sender, new InterruptedException());
- }
- }
- return sendExceptions;
- }
- }