AwsProductSender.java

package gov.usgs.earthquake.aws;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.InvalidSignatureException;
import gov.usgs.earthquake.distribution.ProductSender;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.URLContent;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.util.Config;
import gov.usgs.util.CryptoUtils;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.FileUtils;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.PrivateKey;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.Json;
import javax.json.JsonObject;

/** Send using AWS Hub API. */
public class AwsProductSender extends DefaultConfigurable implements ProductSender {

  /** Initialzation of logger. For us later in file. */
  public static final Logger LOGGER = Logger.getLogger(AwsProductSender.class.getName());

  /** Base URL for Hub API. */
  public static final String HUB_URL_PROPERTY = "url";
  /** Private Key to sign products, if signProducts is true. */
  public static final String PRIVATE_KEY_PROPERTY = "privateKey";
  /** Whether to sign products using private key. */
  public static final String SIGN_PRODUCTS_PROPERTY = "signProducts";

  /**url where products are sent */
  protected URL hubUrl;
  /** signing key */
  protected PrivateKey privateKey;
  /** wheter to sign products */
  protected boolean signProducts = false;

  /** Connection timeout. 5s seems excessive, but be cautious for now */
  protected int connectTimeout = 5000;
  /** Server-side timeout. Called at getInputStream().read() */
  protected int readTimeout = 30000;

  /** Empty class constructor */
  public AwsProductSender() {}

  public AwsProductSender(URL url) {
    this.hubUrl = url;
  }

  @Override
  public void configure(Config config) throws Exception {
    super.configure(config);

    hubUrl = new URL(config.getProperty(HUB_URL_PROPERTY));
    LOGGER.config("[" + getName() + "] url=" + hubUrl.toString());

    final String sign = config.getProperty(SIGN_PRODUCTS_PROPERTY);
    if (sign != null) {
      signProducts = Boolean.valueOf(sign);
    }
    LOGGER.config("[" + getName() + "] sign products=" + signProducts);

    final String key = config.getProperty(PRIVATE_KEY_PROPERTY);
    if (key != null) {
      privateKey = CryptoUtils.readOpenSSHPrivateKey(
          FileUtils.readFile(new File(key)),
          null);
      LOGGER.config("[" + getName() + "] private key=" + key);
    }

    if (signProducts && privateKey == null) {
      // no key configured
      throw new ConfigurationException("[" + getName() + "] " + SIGN_PRODUCTS_PROPERTY
          + " requires a private key for signing");
    }

  }

  /**
   * Send a product to the hub.
   */
  @Override
  public void sendProduct(final Product product) throws Exception {
    final ProductId id = product.getId();

    // re-sign if configured
    if (signProducts) {
      if (product.getSignature() != null) {
        // preserve original signature
        product.getProperties().put("original-signature", product.getSignature());
        product.getProperties().put("original-signature-version",
            product.getSignatureVersion().toString());
      }
      product.sign(privateKey, CryptoUtils.Version.SIGNATURE_V2);
    }
    // convert to json
    JsonObject json = new JsonProduct().getJsonObject(product);

    final long start = new Date().getTime();
    final long afterUploadContent;
    try {
      // upload contents
      if (
        // has contents
        product.getContents().size() > 0
        // and not only inline content
        && !(product.getContents().size() == 1 && product.getContents().get("") != null)
      ) {
        LOGGER.fine("Getting upload urls for " + json.toString());
        // get upload urls, response is product with signed content urls for upload
        Product uploadProduct;
        try {
          uploadProduct = getUploadUrls(json);
        } catch (HttpException e) {
          HttpURLConnection connection = e.response.connection;
          // check for server error
          if (connection.getResponseCode() >= 500) {
            LOGGER.log(Level.FINE,
                "[" + getName() + "] get upload urls exception, trying again", e);
            // try again after random back off (1-5 s)
            Thread.sleep(1000 + Math.round(4000 * Math.random()));
            uploadProduct = getUploadUrls(json);
          } else {
            // otherwise propagate exception as usual
            throw e;
          }
        }

        final long afterGetUploadUrls = new Date().getTime();
        LOGGER.fine("[" + getName() + "] get upload urls " + id.toString()
            + " (" + (afterGetUploadUrls - start) + " ms) ");

        // upload contents
        try {
          uploadContents(product, uploadProduct);
        } catch (HttpException e) {
          HttpURLConnection connection = e.response.connection;
          // check for S3 "503 Slow Down" error
          if (
            503 == connection.getResponseCode()
            && "Slow Down".equals(connection.getResponseMessage())
          ) {
            LOGGER.fine("[" + getName() + "] 503 slow down exception, trying again");
            // try again after random back off (1-5 s)
            Thread.sleep(1000 + Math.round(4000 * Math.random()));
            uploadContents(product, uploadProduct);
          } else {
            // otherwise propagate exception as usual
            throw e;
          }
        }

        afterUploadContent = new Date().getTime();
        LOGGER.fine("[" + getName() + "] upload contents " + id.toString()
            + " (" + (afterUploadContent - afterGetUploadUrls) + " ms) ");
      } else {
        afterUploadContent = new Date().getTime();
      }

      try {
        // send product
        sendProduct(json);
      } catch (HttpException e) {
        HttpURLConnection connection = e.response.connection;
        // check for server error
        if (connection.getResponseCode() >= 500) {
          LOGGER.log(Level.FINE,
              "[" + getName() + "] send product exception, trying again", e);
          // try again after random back off (1-5 s)
          Thread.sleep(1000 + Math.round(4000 * Math.random()));
          sendProduct(json);
        } else {
          // otherwise propagate exception as usual
          throw e;
        }
      }

      final long afterSendProduct = new Date().getTime();
      LOGGER.fine("[" + getName() + "] send product " + id.toString()
          + " (" + (afterSendProduct - afterUploadContent) + " ms) ");
    } catch (ProductAlreadySentException pase) {
      // hub already has product
      LOGGER.info("[" + getName() + "] hub already has product");
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "Exception sending product " + id.toString(), e);
      throw e;
    } finally {
      final long end = new Date().getTime();
      LOGGER.info("[" + getName() + "] send product total " + id.toString()
          + " (" + (end - start) + " ms) ");
    }
  }

  /**
   * Get content upload urls.
   *
   * @param json product in json format.
   * @return product with content urls set to upload URLs.
   * @throws Exception Exception
   */
  protected Product getUploadUrls(final JsonObject json) throws Exception {
    final URL url = new URL(hubUrl, "get_upload_urls");
    final HttpResponse result = postProductJson(url, json);
    final int responseCode = result.connection.getResponseCode();

    // check for errors
    if (responseCode == 401) {
      throw new InvalidSignatureException("Invalid product signature");
    } else if (responseCode == 409) {
      throw new ProductAlreadySentException();
    } else if (responseCode != 200) {
      throw new HttpException(result, "Error getting upload urls");
    }

    // successful response is json object with "products" property
    // that is product with upload urls for contents.
    final JsonObject getUploadUrlsResponse = result.getJsonObject();
    final Product product = new JsonProduct().getProduct(
        getUploadUrlsResponse.getJsonObject("product"));
    return product;
  }

  /**
   * Post product json to a hub url.
   *
   * This is a HTTP POST method,
   * with a JSON content body with a "product" property with the product.
   *
   * @param url url of connection
   * @param product product in json format
   * @return new HTTP POST response
   * @throws Exception Exception
   */
  protected HttpResponse postProductJson(final URL url, final JsonObject product) throws Exception {
    // send as attribute, for extensibility
    final JsonObject json = Json.createObjectBuilder().add("product", product).build();
    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
    connection.setConnectTimeout(connectTimeout);
    connection.setReadTimeout(readTimeout);
    connection.setDoOutput(true);
    connection.setRequestMethod("POST");
    connection.setRequestProperty("Content-Type", "application/json");
    try (final OutputStream out = connection.getOutputStream()) {
      out.write(json.toString().getBytes());
    }
    return new HttpResponse(connection);
  }

  /**
   * Send product after content has been uploaded.
   *
   * @param json product in json format.
   * @return product with content urls pointing to hub.
   * @throws Exception Exception
   */
  protected Product sendProduct(final JsonObject json) throws Exception {
    // send request
    final URL url = new URL(hubUrl, "send_product");
    final HttpResponse result = postProductJson(url, json);
    int responseCode = result.connection.getResponseCode();

    // check for errors
    if (responseCode == 401) {
      throw new InvalidSignatureException("Invalid product signature");
    } else if (responseCode == 409) {
      throw new ProductAlreadySentException();
    } else if (responseCode == 422) {
      throw new HttpException(result,
          "Content validation errors: " + result.getJsonObject().toString());
    } else if (result.connection.getResponseCode() != 200) {
      throw new HttpException(result, "Error sending product");
    }

    // successful response is json object with "notification" property
    // that has "created" and "product" properties with hub urls for contents.
    final JsonObject sendProductResponse = result.getJsonObject();
    final JsonObject notification = sendProductResponse.getJsonObject("notification");
    final Product product = new JsonProduct().getProduct(notification.getJsonObject("product"));
    // json response also has "notification_id" property of broadcast that was sent.
    String notificationId = null;
    if (!sendProductResponse.isNull("notification_id")) {
      notificationId = sendProductResponse.getString("notification_id");
    }
    LOGGER.fine("[" + getName() + "] notification id "
        + notificationId + " " + product.getId().toString());
    return product;
  }

  /**
   * Upload content to a signed url.
   *
   * @param path content path.
   * @param content content to upload.
   * @param signedUrl url where content should be uploaded.
   * @return HTTP result
   * @throws Exception Exception
   */
  protected HttpResponse uploadContent(final String path, final Content content, final URL signedUrl)
      throws Exception {
    final long start = new Date().getTime();
    final HttpURLConnection connection = (HttpURLConnection) signedUrl.openConnection();
    connection.setDoOutput(true);
    connection.setConnectTimeout(connectTimeout);
    connection.setReadTimeout(readTimeout);
    // these values are part of signed url and are required
    connection.setRequestMethod("PUT");
    connection.addRequestProperty("Content-Length", content.getLength().toString());
    connection.addRequestProperty("Content-Type", content.getContentType());
    connection.addRequestProperty(
        "x-amz-meta-modified", XmlUtils.formatDate(content.getLastModified()));
    connection.addRequestProperty("x-amz-meta-sha256", content.getSha256());

    // send content
    try (final InputStream in = content.getInputStream();
        final OutputStream out = connection.getOutputStream()) {
      StreamUtils.transferStream(in, out);
    }
    final HttpResponse result = new HttpResponse(connection);
    final long elapsed = (new Date().getTime() - start);
    if (connection.getResponseCode() != 200) {
      throw new HttpException(result, "Error uploading content "
          + path + " (" + elapsed + " ms)");
    }
    LOGGER.finer(
        "["
            + getName()
            + "] uploaded content " + path + " (size= "
            + content.getLength()
            + " bytes) (time= "
            + elapsed
            + " ms)");
    return result;
  }

  /**
   * Upload product contents.
   *
   * Runs uploads in parallel using a parallel stream.
   *
   * This can be called within a custom ForkJoinPool to use a non-default pool,
   * the default pool is shared by the process and based on number of available
   * cores.
   *
   * @param product product to upload.
   * @param uploadProduct product with signed upload urls.
   * @return upload results
   * @throws Exception if any upload errors occur
   */
  protected Map<String, HttpResponse> uploadContents(
      final Product product, final Product uploadProduct) throws Exception {
    // collect results
    final ConcurrentHashMap<String, HttpResponse> uploadResults =
        new ConcurrentHashMap<String, HttpResponse>();
    final ConcurrentHashMap<String, Exception> uploadExceptions =
        new ConcurrentHashMap<String, Exception>();
    // upload contents in parallel
    uploadProduct.getContents().keySet().parallelStream()
        .filter(path -> !"".equals(path))
        .forEach(
            path -> {
              try {
                Content uploadContent = uploadProduct.getContents().get(path);
                if (!(uploadContent instanceof URLContent)) {
                  throw new IllegalStateException(
                      "Expected URLContent for " + product.getId().toString()
                      + " path '" + path + "' but got " + uploadContent);
                }
                uploadResults.put(
                    path,
                    uploadContent(
                        path,
                        product.getContents().get(path),
                        ((URLContent) uploadContent).getURL()));
              } catch (Exception e) {
                uploadExceptions.put(path, e);
              }
            });
    if (uploadExceptions.size() > 0) {
      Exception e = null;
      // log all
      for (final String path : uploadExceptions.keySet()) {
        e = uploadExceptions.get(path);
        LOGGER.log(Level.WARNING, "Exception uploading content " + path, e);
      }
      // throw last
      throw e;
    }
    return uploadResults;
  }
  /** Getter for signProducts
   * @return boolean
   */
  public boolean getSignProducts() {
    return signProducts;
  }

  /** Setter for signProducts
   * @param sign boolean
   */
  public void setSignProducts(final boolean sign) {
    this.signProducts = sign;
  }

  /** getter for privateKey
   * @return privateKey
   */
  public PrivateKey getPrivateKey() {
    return privateKey;
  }

  /** setting for privateKey
   * @param key PrivateKey
   */
  public void setPrivateKey(final PrivateKey key) {
    this.privateKey = key;
  }

}