AwsProductSender.java

  1. package gov.usgs.earthquake.aws;

  2. import gov.usgs.earthquake.distribution.ConfigurationException;
  3. import gov.usgs.earthquake.distribution.InvalidSignatureException;
  4. import gov.usgs.earthquake.distribution.ProductSender;
  5. import gov.usgs.earthquake.product.Content;
  6. import gov.usgs.earthquake.product.Product;
  7. import gov.usgs.earthquake.product.ProductId;
  8. import gov.usgs.earthquake.product.URLContent;
  9. import gov.usgs.earthquake.product.io.JsonProduct;
  10. import gov.usgs.util.Config;
  11. import gov.usgs.util.CryptoUtils;
  12. import gov.usgs.util.DefaultConfigurable;
  13. import gov.usgs.util.FileUtils;
  14. import gov.usgs.util.StreamUtils;
  15. import gov.usgs.util.XmlUtils;
  16. import java.io.File;
  17. import java.io.InputStream;
  18. import java.io.OutputStream;
  19. import java.net.HttpURLConnection;
  20. import java.net.URL;
  21. import java.security.PrivateKey;
  22. import java.util.Date;
  23. import java.util.Map;
  24. import java.util.concurrent.ConcurrentHashMap;
  25. import java.util.logging.Level;
  26. import java.util.logging.Logger;
  27. import javax.json.Json;
  28. import javax.json.JsonObject;

  29. /** Send using AWS Hub API. */
  30. public class AwsProductSender extends DefaultConfigurable implements ProductSender {

  31.   /** Initialzation of logger. For us later in file. */
  32.   public static final Logger LOGGER = Logger.getLogger(AwsProductSender.class.getName());

  33.   /** Base URL for Hub API. */
  34.   public static final String HUB_URL_PROPERTY = "url";
  35.   /** Private Key to sign products, if signProducts is true. */
  36.   public static final String PRIVATE_KEY_PROPERTY = "privateKey";
  37.   /** Whether to sign products using private key. */
  38.   public static final String SIGN_PRODUCTS_PROPERTY = "signProducts";

  39.   /**url where products are sent */
  40.   protected URL hubUrl;
  41.   /** signing key */
  42.   protected PrivateKey privateKey;
  43.   /** wheter to sign products */
  44.   protected boolean signProducts = false;

  45.   /** Connection timeout. 5s seems excessive, but be cautious for now */
  46.   protected int connectTimeout = 5000;
  47.   /** Server-side timeout. Called at getInputStream().read() */
  48.   protected int readTimeout = 30000;

  49.   /** Empty class constructor */
  50.   public AwsProductSender() {}

  51.   public AwsProductSender(URL url) {
  52.     this.hubUrl = url;
  53.   }

  54.   @Override
  55.   public void configure(Config config) throws Exception {
  56.     super.configure(config);

  57.     hubUrl = new URL(config.getProperty(HUB_URL_PROPERTY));
  58.     LOGGER.config("[" + getName() + "] url=" + hubUrl.toString());

  59.     final String sign = config.getProperty(SIGN_PRODUCTS_PROPERTY);
  60.     if (sign != null) {
  61.       signProducts = Boolean.valueOf(sign);
  62.     }
  63.     LOGGER.config("[" + getName() + "] sign products=" + signProducts);

  64.     final String key = config.getProperty(PRIVATE_KEY_PROPERTY);
  65.     if (key != null) {
  66.       privateKey = CryptoUtils.readOpenSSHPrivateKey(
  67.           FileUtils.readFile(new File(key)),
  68.           null);
  69.       LOGGER.config("[" + getName() + "] private key=" + key);
  70.     }

  71.     if (signProducts && privateKey == null) {
  72.       // no key configured
  73.       throw new ConfigurationException("[" + getName() + "] " + SIGN_PRODUCTS_PROPERTY
  74.           + " requires a private key for signing");
  75.     }

  76.   }

  77.   /**
  78.    * Send a product to the hub.
  79.    */
  80.   @Override
  81.   public void sendProduct(final Product product) throws Exception {
  82.     final ProductId id = product.getId();

  83.     // re-sign if configured
  84.     if (signProducts) {
  85.       if (product.getSignature() != null) {
  86.         // preserve original signature
  87.         product.getProperties().put("original-signature", product.getSignature());
  88.         product.getProperties().put("original-signature-version",
  89.             product.getSignatureVersion().toString());
  90.       }
  91.       product.sign(privateKey, CryptoUtils.Version.SIGNATURE_V2);
  92.     }
  93.     // convert to json
  94.     JsonObject json = new JsonProduct().getJsonObject(product);

  95.     final long start = new Date().getTime();
  96.     final long afterUploadContent;
  97.     try {
  98.       // upload contents
  99.       if (
  100.         // has contents
  101.         product.getContents().size() > 0
  102.         // and not only inline content
  103.         && !(product.getContents().size() == 1 && product.getContents().get("") != null)
  104.       ) {
  105.         LOGGER.fine("Getting upload urls for " + json.toString());
  106.         // get upload urls, response is product with signed content urls for upload
  107.         Product uploadProduct;
  108.         try {
  109.           uploadProduct = getUploadUrls(json);
  110.         } catch (HttpException e) {
  111.           HttpURLConnection connection = e.response.connection;
  112.           // check for server error
  113.           if (connection.getResponseCode() >= 500) {
  114.             LOGGER.log(Level.FINE,
  115.                 "[" + getName() + "] get upload urls exception, trying again", e);
  116.             // try again after random back off (1-5 s)
  117.             Thread.sleep(1000 + Math.round(4000 * Math.random()));
  118.             uploadProduct = getUploadUrls(json);
  119.           } else {
  120.             // otherwise propagate exception as usual
  121.             throw e;
  122.           }
  123.         }

  124.         final long afterGetUploadUrls = new Date().getTime();
  125.         LOGGER.fine("[" + getName() + "] get upload urls " + id.toString()
  126.             + " (" + (afterGetUploadUrls - start) + " ms) ");

  127.         // upload contents
  128.         try {
  129.           uploadContents(product, uploadProduct);
  130.         } catch (HttpException e) {
  131.           HttpURLConnection connection = e.response.connection;
  132.           // check for S3 "503 Slow Down" error
  133.           if (
  134.             503 == connection.getResponseCode()
  135.             && "Slow Down".equals(connection.getResponseMessage())
  136.           ) {
  137.             LOGGER.fine("[" + getName() + "] 503 slow down exception, trying again");
  138.             // try again after random back off (1-5 s)
  139.             Thread.sleep(1000 + Math.round(4000 * Math.random()));
  140.             uploadContents(product, uploadProduct);
  141.           } else {
  142.             // otherwise propagate exception as usual
  143.             throw e;
  144.           }
  145.         }

  146.         afterUploadContent = new Date().getTime();
  147.         LOGGER.fine("[" + getName() + "] upload contents " + id.toString()
  148.             + " (" + (afterUploadContent - afterGetUploadUrls) + " ms) ");
  149.       } else {
  150.         afterUploadContent = new Date().getTime();
  151.       }

  152.       try {
  153.         // send product
  154.         sendProduct(json);
  155.       } catch (HttpException e) {
  156.         HttpURLConnection connection = e.response.connection;
  157.         // check for server error
  158.         if (connection.getResponseCode() >= 500) {
  159.           LOGGER.log(Level.FINE,
  160.               "[" + getName() + "] send product exception, trying again", e);
  161.           // try again after random back off (1-5 s)
  162.           Thread.sleep(1000 + Math.round(4000 * Math.random()));
  163.           sendProduct(json);
  164.         } else {
  165.           // otherwise propagate exception as usual
  166.           throw e;
  167.         }
  168.       }

  169.       final long afterSendProduct = new Date().getTime();
  170.       LOGGER.fine("[" + getName() + "] send product " + id.toString()
  171.           + " (" + (afterSendProduct - afterUploadContent) + " ms) ");
  172.     } catch (ProductAlreadySentException pase) {
  173.       // hub already has product
  174.       LOGGER.info("[" + getName() + "] hub already has product");
  175.     } catch (Exception e) {
  176.       LOGGER.log(Level.WARNING, "Exception sending product " + id.toString(), e);
  177.       throw e;
  178.     } finally {
  179.       final long end = new Date().getTime();
  180.       LOGGER.info("[" + getName() + "] send product total " + id.toString()
  181.           + " (" + (end - start) + " ms) ");
  182.     }
  183.   }

  184.   /**
  185.    * Get content upload urls.
  186.    *
  187.    * @param json product in json format.
  188.    * @return product with content urls set to upload URLs.
  189.    * @throws Exception Exception
  190.    */
  191.   protected Product getUploadUrls(final JsonObject json) throws Exception {
  192.     final URL url = new URL(hubUrl, "get_upload_urls");
  193.     final HttpResponse result = postProductJson(url, json);
  194.     final int responseCode = result.connection.getResponseCode();

  195.     // check for errors
  196.     if (responseCode == 401) {
  197.       throw new InvalidSignatureException("Invalid product signature");
  198.     } else if (responseCode == 409) {
  199.       throw new ProductAlreadySentException();
  200.     } else if (responseCode != 200) {
  201.       throw new HttpException(result, "Error getting upload urls");
  202.     }

  203.     // successful response is json object with "products" property
  204.     // that is product with upload urls for contents.
  205.     final JsonObject getUploadUrlsResponse = result.getJsonObject();
  206.     final Product product = new JsonProduct().getProduct(
  207.         getUploadUrlsResponse.getJsonObject("product"));
  208.     return product;
  209.   }

  210.   /**
  211.    * Post product json to a hub url.
  212.    *
  213.    * This is a HTTP POST method,
  214.    * with a JSON content body with a "product" property with the product.
  215.    *
  216.    * @param url url of connection
  217.    * @param product product in json format
  218.    * @return new HTTP POST response
  219.    * @throws Exception Exception
  220.    */
  221.   protected HttpResponse postProductJson(final URL url, final JsonObject product) throws Exception {
  222.     // send as attribute, for extensibility
  223.     final JsonObject json = Json.createObjectBuilder().add("product", product).build();
  224.     HttpURLConnection connection = (HttpURLConnection) url.openConnection();
  225.     connection.setConnectTimeout(connectTimeout);
  226.     connection.setReadTimeout(readTimeout);
  227.     connection.setDoOutput(true);
  228.     connection.setRequestMethod("POST");
  229.     connection.setRequestProperty("Content-Type", "application/json");
  230.     try (final OutputStream out = connection.getOutputStream()) {
  231.       out.write(json.toString().getBytes());
  232.     }
  233.     return new HttpResponse(connection);
  234.   }

  235.   /**
  236.    * Send product after content has been uploaded.
  237.    *
  238.    * @param json product in json format.
  239.    * @return product with content urls pointing to hub.
  240.    * @throws Exception Exception
  241.    */
  242.   protected Product sendProduct(final JsonObject json) throws Exception {
  243.     // send request
  244.     final URL url = new URL(hubUrl, "send_product");
  245.     final HttpResponse result = postProductJson(url, json);
  246.     int responseCode = result.connection.getResponseCode();

  247.     // check for errors
  248.     if (responseCode == 401) {
  249.       throw new InvalidSignatureException("Invalid product signature");
  250.     } else if (responseCode == 409) {
  251.       throw new ProductAlreadySentException();
  252.     } else if (responseCode == 422) {
  253.       throw new HttpException(result,
  254.           "Content validation errors: " + result.getJsonObject().toString());
  255.     } else if (result.connection.getResponseCode() != 200) {
  256.       throw new HttpException(result, "Error sending product");
  257.     }

  258.     // successful response is json object with "notification" property
  259.     // that has "created" and "product" properties with hub urls for contents.
  260.     final JsonObject sendProductResponse = result.getJsonObject();
  261.     final JsonObject notification = sendProductResponse.getJsonObject("notification");
  262.     final Product product = new JsonProduct().getProduct(notification.getJsonObject("product"));
  263.     // json response also has "notification_id" property of broadcast that was sent.
  264.     String notificationId = null;
  265.     if (!sendProductResponse.isNull("notification_id")) {
  266.       notificationId = sendProductResponse.getString("notification_id");
  267.     }
  268.     LOGGER.fine("[" + getName() + "] notification id "
  269.         + notificationId + " " + product.getId().toString());
  270.     return product;
  271.   }

  272.   /**
  273.    * Upload content to a signed url.
  274.    *
  275.    * @param path content path.
  276.    * @param content content to upload.
  277.    * @param signedUrl url where content should be uploaded.
  278.    * @return HTTP result
  279.    * @throws Exception Exception
  280.    */
  281.   protected HttpResponse uploadContent(final String path, final Content content, final URL signedUrl)
  282.       throws Exception {
  283.     final long start = new Date().getTime();
  284.     final HttpURLConnection connection = (HttpURLConnection) signedUrl.openConnection();
  285.     connection.setDoOutput(true);
  286.     connection.setConnectTimeout(connectTimeout);
  287.     connection.setReadTimeout(readTimeout);
  288.     // these values are part of signed url and are required
  289.     connection.setRequestMethod("PUT");
  290.     connection.addRequestProperty("Content-Length", content.getLength().toString());
  291.     connection.addRequestProperty("Content-Type", content.getContentType());
  292.     connection.addRequestProperty(
  293.         "x-amz-meta-modified", XmlUtils.formatDate(content.getLastModified()));
  294.     connection.addRequestProperty("x-amz-meta-sha256", content.getSha256());

  295.     // send content
  296.     try (final InputStream in = content.getInputStream();
  297.         final OutputStream out = connection.getOutputStream()) {
  298.       StreamUtils.transferStream(in, out);
  299.     }
  300.     final HttpResponse result = new HttpResponse(connection);
  301.     final long elapsed = (new Date().getTime() - start);
  302.     if (connection.getResponseCode() != 200) {
  303.       throw new HttpException(result, "Error uploading content "
  304.           + path + " (" + elapsed + " ms)");
  305.     }
  306.     LOGGER.finer(
  307.         "["
  308.             + getName()
  309.             + "] uploaded content " + path + " (size= "
  310.             + content.getLength()
  311.             + " bytes) (time= "
  312.             + elapsed
  313.             + " ms)");
  314.     return result;
  315.   }

  316.   /**
  317.    * Upload product contents.
  318.    *
  319.    * Runs uploads in parallel using a parallel stream.
  320.    *
  321.    * This can be called within a custom ForkJoinPool to use a non-default pool,
  322.    * the default pool is shared by the process and based on number of available
  323.    * cores.
  324.    *
  325.    * @param product product to upload.
  326.    * @param uploadProduct product with signed upload urls.
  327.    * @return upload results
  328.    * @throws Exception if any upload errors occur
  329.    */
  330.   protected Map<String, HttpResponse> uploadContents(
  331.       final Product product, final Product uploadProduct) throws Exception {
  332.     // collect results
  333.     final ConcurrentHashMap<String, HttpResponse> uploadResults =
  334.         new ConcurrentHashMap<String, HttpResponse>();
  335.     final ConcurrentHashMap<String, Exception> uploadExceptions =
  336.         new ConcurrentHashMap<String, Exception>();
  337.     // upload contents in parallel
  338.     uploadProduct.getContents().keySet().parallelStream()
  339.         .filter(path -> !"".equals(path))
  340.         .forEach(
  341.             path -> {
  342.               try {
  343.                 Content uploadContent = uploadProduct.getContents().get(path);
  344.                 if (!(uploadContent instanceof URLContent)) {
  345.                   throw new IllegalStateException(
  346.                       "Expected URLContent for " + product.getId().toString()
  347.                       + " path '" + path + "' but got " + uploadContent);
  348.                 }
  349.                 uploadResults.put(
  350.                     path,
  351.                     uploadContent(
  352.                         path,
  353.                         product.getContents().get(path),
  354.                         ((URLContent) uploadContent).getURL()));
  355.               } catch (Exception e) {
  356.                 uploadExceptions.put(path, e);
  357.               }
  358.             });
  359.     if (uploadExceptions.size() > 0) {
  360.       Exception e = null;
  361.       // log all
  362.       for (final String path : uploadExceptions.keySet()) {
  363.         e = uploadExceptions.get(path);
  364.         LOGGER.log(Level.WARNING, "Exception uploading content " + path, e);
  365.       }
  366.       // throw last
  367.       throw e;
  368.     }
  369.     return uploadResults;
  370.   }
  371.   /** Getter for signProducts
  372.    * @return boolean
  373.    */
  374.   public boolean getSignProducts() {
  375.     return signProducts;
  376.   }

  377.   /** Setter for signProducts
  378.    * @param sign boolean
  379.    */
  380.   public void setSignProducts(final boolean sign) {
  381.     this.signProducts = sign;
  382.   }

  383.   /** getter for privateKey
  384.    * @return privateKey
  385.    */
  386.   public PrivateKey getPrivateKey() {
  387.     return privateKey;
  388.   }

  389.   /** setting for privateKey
  390.    * @param key PrivateKey
  391.    */
  392.   public void setPrivateKey(final PrivateKey key) {
  393.     this.privateKey = key;
  394.   }

  395. }