ProductBuilder.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.earthquake.product.Product;
  3. import gov.usgs.util.Config;
  4. import gov.usgs.util.CryptoUtils;
  5. import gov.usgs.util.DefaultConfigurable;
  6. import gov.usgs.util.StreamUtils;
  7. import gov.usgs.util.StringUtils;
  8. import gov.usgs.util.CryptoUtils.Version;

  9. import java.io.File;
  10. import java.net.MalformedURLException;
  11. import java.net.URL;
  12. import java.security.PrivateKey;
  13. import java.util.ArrayList;
  14. import java.util.Collections;
  15. import java.util.HashMap;
  16. import java.util.Iterator;
  17. import java.util.LinkedList;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.concurrent.Callable;
  21. import java.util.concurrent.ExecutorService;
  22. import java.util.concurrent.Executors;
  23. import java.util.concurrent.TimeUnit;
  24. import java.util.logging.Level;
  25. import java.util.logging.Logger;

  26. /**
  27.  * Essentials for building/sending products.
  28.  *
  29.  * This is the base class for other builders.
  30.  *
  31.  * Supported configurable properties:
  32.  * <dl>
  33.  * <dt>senders</dt>
  34.  * <dd>A comma delimited list of product senders to use when sending
  35.  * products.</dd>
  36.  * <dt>trackerURL</dt>
  37.  * <dd>Default tracker URL to assign to products that don't already have
  38.  * one.</dd>
  39.  * <dt>privateKeyFile</dt>
  40.  * <dd>Path to a private key that can be used to sign products.</dd>
  41.  * </dl>
  42.  */
  43. public class ProductBuilder extends DefaultConfigurable {

  44.     private static final Logger LOGGER = Logger.getLogger(ProductBuilder.class.getSimpleName());

  45.     /** Configurable property for senders. */
  46.     public static final String SENDERS_PROPERTY = "senders";

  47.     /** Property name used for configuring a tracker url. */
  48.     public static final String TRACKER_URL_PROPERTY = "trackerURL";

  49.     /** Private key filename configuration property. */
  50.     public static final String PRIVATE_KEY_PROPERTY = "privateKeyFile";

  51.     /** Signature version property. */
  52.     public static final String SIGNATURE_VERSION_PROPERTY = "signatureVersion";

  53.     /** Send in parallel. */
  54.     public static final String PARALLEL_SEND_PROPERTY = "parallelSend";
  55.     /** Bool for parallel send */
  56.     public static final String DEFAULT_PARALLEL_SEND = "true";

  57.     /** Timeout in seconds for parallel send. */
  58.     public static final String PARALLEL_SEND_TIMEOUT_PROPERTY = "parallelSendTimeout";
  59.     /** time in ms for parallel send timemout */
  60.     public static final String DEFAULT_PARALLEL_SEND_TIMEOUT = "300";

  61.     /** Default tracker url. */
  62.     public static final URL DEFAULT_TRACKER_URL;
  63.     static {
  64.         URL url = null;
  65.         try {
  66.             url = new URL("http://ehppdl1.cr.usgs.gov/tracker/");
  67.         } catch (MalformedURLException mue) {
  68.             LOGGER.severe("Failed to parse default tracker url.");
  69.             System.exit(1);
  70.         }
  71.         DEFAULT_TRACKER_URL = url;
  72.     }

  73.     /** List of senders where built products are sent. */
  74.     private List<ProductSender> senders = new LinkedList<ProductSender>();

  75.     /** Default trackerURL to set on sent products. */
  76.     private URL trackerURL;

  77.     /** Key used to sign sent products. */
  78.     private PrivateKey privateKey;

  79.     /** Signature version. */
  80.     private Version signatureVersion = Version.SIGNATURE_V1;

  81.     /** Whether to send in parallel. */
  82.     protected boolean parallelSend = true;

  83.     /** How long to wait before parallel send timeout. */
  84.     protected long parallelSendTimeout = 300L;

  85.     /** Default product builder constructor */
  86.     public ProductBuilder() {
  87.         trackerURL = DEFAULT_TRACKER_URL;
  88.     }

  89.     /**
  90.      * Send a product.
  91.      *
  92.      * If the product doesn't yet have a tracker URL, assigns current tracker URL to
  93.      * product. If the product has not yet been signed, and a privateKey is
  94.      * configured, signs the product before sending.
  95.      *
  96.      * @param product the product to send.
  97.      * @return map of all exceptions thrown, from Sender to corresponding Exception.
  98.      * @throws Exception if an error occurs while signing product.
  99.      */
  100.     public Map<ProductSender, Exception> sendProduct(final Product product) throws Exception {

  101.         // doesn't already have a tracker url
  102.         if (product.getTrackerURL() == null) {
  103.             product.setTrackerURL(trackerURL);
  104.         }

  105.         // mark which version of client was used to create product
  106.         product.getProperties().put(ProductClient.PDL_CLIENT_VERSION_PROPERTY, ProductClient.RELEASE_VERSION);

  107.         // doesn't already have a signature.
  108.         if (privateKey != null && product.getSignature() == null) {
  109.             product.sign(privateKey, signatureVersion);
  110.         }

  111.         // send tracker update
  112.         new ProductTracker(product.getTrackerURL()).productCreated(this.getName(), product.getId());

  113.         // send product using all product senders.
  114.         if (parallelSend) {
  115.             return parallelSendProduct(senders, product, parallelSendTimeout);
  116.         }

  117.         // send sequentially if not parallel
  118.         Map<ProductSender, Exception> errors = new HashMap<ProductSender, Exception>();
  119.         Iterator<ProductSender> iter = new LinkedList<ProductSender>(senders).iterator();
  120.         while (iter.hasNext()) {
  121.             ProductSender sender = iter.next();
  122.             try {
  123.                 sender.sendProduct(product);
  124.             } catch (Exception e) {
  125.                 if (e instanceof ProductAlreadyInStorageException) {
  126.                     // condense this message...
  127.                     LOGGER.info("Product already in storage, id=" + product.getId().toString());
  128.                 } else {
  129.                     LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
  130.                     errors.put(sender, e);
  131.                 }
  132.             }
  133.         }

  134.         return errors;
  135.     }

  136.     /**
  137.      * @return list of product senders
  138.      */
  139.     public List<ProductSender> getProductSenders() {
  140.         return senders;
  141.     }

  142.     /**
  143.      * Add a ProductSender.
  144.      *
  145.      * @param sender to add
  146.      */
  147.     public void addProductSender(final ProductSender sender) {
  148.         senders.add(sender);
  149.     }

  150.     /**
  151.      * Remove a previously added ProductSender.
  152.      *
  153.      * @param sender to remove
  154.      */
  155.     public void removeProductSender(final ProductSender sender) {
  156.         senders.remove(sender);
  157.     }

  158.     /** @return trackerURL */
  159.     public URL getTrackerURL() {
  160.         return trackerURL;
  161.     }

  162.     /** @param trackerURL to set */
  163.     public void setTrackerURL(URL trackerURL) {
  164.         this.trackerURL = trackerURL;
  165.     }

  166.     /** @return privateKey */
  167.     public PrivateKey getPrivateKey() {
  168.         return privateKey;
  169.     }

  170.     /** @param privateKey to set */
  171.     public void setPrivateKey(PrivateKey privateKey) {
  172.         this.privateKey = privateKey;
  173.     }

  174.     /** @return signatureVersion */
  175.     public Version getSignatureVersion() {
  176.         return signatureVersion;
  177.     }

  178.     /** @param signatureVersion to set */
  179.     public void setSignatureVersion(Version signatureVersion) {
  180.         this.signatureVersion = signatureVersion;
  181.     }

  182.     @Override
  183.     public void configure(final Config config) throws Exception {
  184.         Iterator<String> senderNames = StringUtils.split(config.getProperty(SENDERS_PROPERTY), ",").iterator();
  185.         while (senderNames.hasNext()) {
  186.             String name = senderNames.next();
  187.             LOGGER.config("Loading sender " + name);

  188.             ProductSender sender = (ProductSender) Config.getConfig().getObject(name);
  189.             if (sender == null) {
  190.                 throw new ConfigurationException("Unable to load sender '" + name + "', make sure it is properly configured.");
  191.             }
  192.             addProductSender(sender);
  193.         }

  194.         String url = config.getProperty(TRACKER_URL_PROPERTY);
  195.         if (url != null) {
  196.             trackerURL = new URL(url);
  197.         }
  198.         LOGGER.config("[" + getName() + "] Using tracker URL '" + trackerURL.toString() + "'");

  199.         String keyFilename = config.getProperty(PRIVATE_KEY_PROPERTY);
  200.         if (keyFilename != null) {
  201.             LOGGER.config("[" + getName() + "] Loading private key file '" + keyFilename + "'");
  202.             privateKey = CryptoUtils.readOpenSSHPrivateKey(StreamUtils.readStream(new File(keyFilename)), null);
  203.         }

  204.         String version = config.getProperty(SIGNATURE_VERSION_PROPERTY);
  205.         if (version != null) {
  206.             signatureVersion = Version.fromString(version);
  207.         }
  208.         LOGGER.config("[" + getName() + "] signature version = " + signatureVersion);

  209.         parallelSend = Boolean.valueOf(config.getProperty(PARALLEL_SEND_PROPERTY, DEFAULT_PARALLEL_SEND));
  210.         parallelSendTimeout = Long
  211.                 .valueOf(config.getProperty(PARALLEL_SEND_TIMEOUT_PROPERTY, DEFAULT_PARALLEL_SEND_TIMEOUT));
  212.         LOGGER.config("[" + getName() + "] parallel send enabled=" + parallelSend + ", timeout=" + parallelSendTimeout);
  213.     }

  214.     @Override
  215.     public void shutdown() throws Exception {
  216.         Iterator<ProductSender> iter = senders.iterator();
  217.         while (iter.hasNext()) {
  218.             iter.next().shutdown();
  219.         }
  220.     }

  221.     @Override
  222.     public void startup() throws Exception {
  223.         Iterator<ProductSender> iter = senders.iterator();
  224.         while (iter.hasNext()) {
  225.             iter.next().startup();
  226.         }
  227.     }

  228.     /**
  229.      * Send a product to all ProductSenders concurrently.
  230.      *
  231.      * @param senders        the senders to receive product.
  232.      * @param product        the product to send.
  233.      * @param timeoutSeconds number of seconds before timing out, interrupting any
  234.      *                       pending send.
  235.      * @return exceptions that occured while sending. If map is empty, there were no
  236.      *         exceptions.
  237.      */
  238.     public static Map<ProductSender, Exception> parallelSendProduct(final List<ProductSender> senders,
  239.             final Product product, final long timeoutSeconds) {
  240.         final Map<ProductSender, Boolean> sendComplete = Collections.synchronizedMap(new HashMap<ProductSender, Boolean>());
  241.         final Map<ProductSender, Exception> sendExceptions = Collections
  242.                 .synchronizedMap(new HashMap<ProductSender, Exception>());

  243.         Iterator<ProductSender> iter = senders.iterator();
  244.         List<Callable<Void>> sendTasks = new ArrayList<Callable<Void>>();
  245.         while (iter.hasNext()) {
  246.             final ProductSender sender = iter.next();
  247.             sendComplete.put(sender, false);
  248.             sendTasks.add(() -> {
  249.                 try {
  250.                     sender.sendProduct(product);
  251.                     sendComplete.put(sender, true);
  252.                 } catch (Exception e) {
  253.                     if (e instanceof ProductAlreadyInStorageException) {
  254.                         LOGGER.info("Product already in storage, id=" + product.getId().toString());
  255.                     } else {
  256.                         LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
  257.                         sendExceptions.put(sender, e);
  258.                     }
  259.                 }
  260.                 return null;
  261.             });
  262.         }
  263.         // run in parallel
  264.         ExecutorService sendExecutor = Executors.newFixedThreadPool(senders.size());
  265.         try {
  266.             sendExecutor.invokeAll(sendTasks, timeoutSeconds, TimeUnit.SECONDS);
  267.         } catch (Exception e) {
  268.             // this may be Interupted, NullPointer, or RejectedExecution
  269.             // in any case, this part is done and move on to checking send status
  270.         }
  271.         sendExecutor.shutdown();
  272.         // check whether send completed or was interrupted
  273.         for (ProductSender sender : sendComplete.keySet()) {
  274.             if (!sendComplete.get(sender) && sendExceptions.get(sender) == null) {
  275.                 sendExceptions.put(sender, new InterruptedException());
  276.             }
  277.         }

  278.         return sendExceptions;
  279.     }

  280. }