AwsProductSender.java
- package gov.usgs.earthquake.aws;
- import gov.usgs.earthquake.distribution.ConfigurationException;
- import gov.usgs.earthquake.distribution.InvalidSignatureException;
- import gov.usgs.earthquake.distribution.ProductSender;
- import gov.usgs.earthquake.product.Content;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.earthquake.product.URLContent;
- import gov.usgs.earthquake.product.io.JsonProduct;
- import gov.usgs.util.Config;
- import gov.usgs.util.CryptoUtils;
- import gov.usgs.util.DefaultConfigurable;
- import gov.usgs.util.FileUtils;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.XmlUtils;
- import java.io.File;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.security.PrivateKey;
- import java.util.Date;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import javax.json.Json;
- import javax.json.JsonObject;
- /** Send using AWS Hub API. */
- public class AwsProductSender extends DefaultConfigurable implements ProductSender {
- /** Initialzation of logger. For us later in file. */
- public static final Logger LOGGER = Logger.getLogger(AwsProductSender.class.getName());
- /** Base URL for Hub API. */
- public static final String HUB_URL_PROPERTY = "url";
- /** Private Key to sign products, if signProducts is true. */
- public static final String PRIVATE_KEY_PROPERTY = "privateKey";
- /** Whether to sign products using private key. */
- public static final String SIGN_PRODUCTS_PROPERTY = "signProducts";
- /**url where products are sent */
- protected URL hubUrl;
- /** signing key */
- protected PrivateKey privateKey;
- /** wheter to sign products */
- protected boolean signProducts = false;
- /** Connection timeout. 5s seems excessive, but be cautious for now */
- protected int connectTimeout = 5000;
- /** Server-side timeout. Called at getInputStream().read() */
- protected int readTimeout = 30000;
- /** Empty class constructor */
- public AwsProductSender() {}
- public AwsProductSender(URL url) {
- this.hubUrl = url;
- }
- @Override
- public void configure(Config config) throws Exception {
- super.configure(config);
- hubUrl = new URL(config.getProperty(HUB_URL_PROPERTY));
- LOGGER.config("[" + getName() + "] url=" + hubUrl.toString());
- final String sign = config.getProperty(SIGN_PRODUCTS_PROPERTY);
- if (sign != null) {
- signProducts = Boolean.valueOf(sign);
- }
- LOGGER.config("[" + getName() + "] sign products=" + signProducts);
- final String key = config.getProperty(PRIVATE_KEY_PROPERTY);
- if (key != null) {
- privateKey = CryptoUtils.readOpenSSHPrivateKey(
- FileUtils.readFile(new File(key)),
- null);
- LOGGER.config("[" + getName() + "] private key=" + key);
- }
- if (signProducts && privateKey == null) {
- // no key configured
- throw new ConfigurationException("[" + getName() + "] " + SIGN_PRODUCTS_PROPERTY
- + " requires a private key for signing");
- }
- }
- /**
- * Send a product to the hub.
- */
- @Override
- public void sendProduct(final Product product) throws Exception {
- final ProductId id = product.getId();
- // re-sign if configured
- if (signProducts) {
- if (product.getSignature() != null) {
- // preserve original signature
- product.getProperties().put("original-signature", product.getSignature());
- product.getProperties().put("original-signature-version",
- product.getSignatureVersion().toString());
- }
- product.sign(privateKey, CryptoUtils.Version.SIGNATURE_V2);
- }
- // convert to json
- JsonObject json = new JsonProduct().getJsonObject(product);
- final long start = new Date().getTime();
- final long afterUploadContent;
- try {
- // upload contents
- if (
- // has contents
- product.getContents().size() > 0
- // and not only inline content
- && !(product.getContents().size() == 1 && product.getContents().get("") != null)
- ) {
- LOGGER.fine("Getting upload urls for " + json.toString());
- // get upload urls, response is product with signed content urls for upload
- Product uploadProduct;
- try {
- uploadProduct = getUploadUrls(json);
- } catch (HttpException e) {
- HttpURLConnection connection = e.response.connection;
- // check for server error
- if (connection.getResponseCode() >= 500) {
- LOGGER.log(Level.FINE,
- "[" + getName() + "] get upload urls exception, trying again", e);
- // try again after random back off (1-5 s)
- Thread.sleep(1000 + Math.round(4000 * Math.random()));
- uploadProduct = getUploadUrls(json);
- } else {
- // otherwise propagate exception as usual
- throw e;
- }
- }
- final long afterGetUploadUrls = new Date().getTime();
- LOGGER.fine("[" + getName() + "] get upload urls " + id.toString()
- + " (" + (afterGetUploadUrls - start) + " ms) ");
- // upload contents
- try {
- uploadContents(product, uploadProduct);
- } catch (HttpException e) {
- HttpURLConnection connection = e.response.connection;
- // check for S3 "503 Slow Down" error
- if (
- 503 == connection.getResponseCode()
- && "Slow Down".equals(connection.getResponseMessage())
- ) {
- LOGGER.fine("[" + getName() + "] 503 slow down exception, trying again");
- // try again after random back off (1-5 s)
- Thread.sleep(1000 + Math.round(4000 * Math.random()));
- uploadContents(product, uploadProduct);
- } else {
- // otherwise propagate exception as usual
- throw e;
- }
- }
- afterUploadContent = new Date().getTime();
- LOGGER.fine("[" + getName() + "] upload contents " + id.toString()
- + " (" + (afterUploadContent - afterGetUploadUrls) + " ms) ");
- } else {
- afterUploadContent = new Date().getTime();
- }
- try {
- // send product
- sendProduct(json);
- } catch (HttpException e) {
- HttpURLConnection connection = e.response.connection;
- // check for server error
- if (connection.getResponseCode() >= 500) {
- LOGGER.log(Level.FINE,
- "[" + getName() + "] send product exception, trying again", e);
- // try again after random back off (1-5 s)
- Thread.sleep(1000 + Math.round(4000 * Math.random()));
- sendProduct(json);
- } else {
- // otherwise propagate exception as usual
- throw e;
- }
- }
- final long afterSendProduct = new Date().getTime();
- LOGGER.fine("[" + getName() + "] send product " + id.toString()
- + " (" + (afterSendProduct - afterUploadContent) + " ms) ");
- } catch (ProductAlreadySentException pase) {
- // hub already has product
- LOGGER.info("[" + getName() + "] hub already has product");
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception sending product " + id.toString(), e);
- throw e;
- } finally {
- final long end = new Date().getTime();
- LOGGER.info("[" + getName() + "] send product total " + id.toString()
- + " (" + (end - start) + " ms) ");
- }
- }
- /**
- * Get content upload urls.
- *
- * @param json product in json format.
- * @return product with content urls set to upload URLs.
- * @throws Exception Exception
- */
- protected Product getUploadUrls(final JsonObject json) throws Exception {
- final URL url = new URL(hubUrl, "get_upload_urls");
- final HttpResponse result = postProductJson(url, json);
- final int responseCode = result.connection.getResponseCode();
- // check for errors
- if (responseCode == 401) {
- throw new InvalidSignatureException("Invalid product signature");
- } else if (responseCode == 409) {
- throw new ProductAlreadySentException();
- } else if (responseCode != 200) {
- throw new HttpException(result, "Error getting upload urls");
- }
- // successful response is json object with "products" property
- // that is product with upload urls for contents.
- final JsonObject getUploadUrlsResponse = result.getJsonObject();
- final Product product = new JsonProduct().getProduct(
- getUploadUrlsResponse.getJsonObject("product"));
- return product;
- }
- /**
- * Post product json to a hub url.
- *
- * This is a HTTP POST method,
- * with a JSON content body with a "product" property with the product.
- *
- * @param url url of connection
- * @param product product in json format
- * @return new HTTP POST response
- * @throws Exception Exception
- */
- protected HttpResponse postProductJson(final URL url, final JsonObject product) throws Exception {
- // send as attribute, for extensibility
- final JsonObject json = Json.createObjectBuilder().add("product", product).build();
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setConnectTimeout(connectTimeout);
- connection.setReadTimeout(readTimeout);
- connection.setDoOutput(true);
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/json");
- try (final OutputStream out = connection.getOutputStream()) {
- out.write(json.toString().getBytes());
- }
- return new HttpResponse(connection);
- }
- /**
- * Send product after content has been uploaded.
- *
- * @param json product in json format.
- * @return product with content urls pointing to hub.
- * @throws Exception Exception
- */
- protected Product sendProduct(final JsonObject json) throws Exception {
- // send request
- final URL url = new URL(hubUrl, "send_product");
- final HttpResponse result = postProductJson(url, json);
- int responseCode = result.connection.getResponseCode();
- // check for errors
- if (responseCode == 401) {
- throw new InvalidSignatureException("Invalid product signature");
- } else if (responseCode == 409) {
- throw new ProductAlreadySentException();
- } else if (responseCode == 422) {
- throw new HttpException(result,
- "Content validation errors: " + result.getJsonObject().toString());
- } else if (result.connection.getResponseCode() != 200) {
- throw new HttpException(result, "Error sending product");
- }
- // successful response is json object with "notification" property
- // that has "created" and "product" properties with hub urls for contents.
- final JsonObject sendProductResponse = result.getJsonObject();
- final JsonObject notification = sendProductResponse.getJsonObject("notification");
- final Product product = new JsonProduct().getProduct(notification.getJsonObject("product"));
- // json response also has "notification_id" property of broadcast that was sent.
- String notificationId = null;
- if (!sendProductResponse.isNull("notification_id")) {
- notificationId = sendProductResponse.getString("notification_id");
- }
- LOGGER.fine("[" + getName() + "] notification id "
- + notificationId + " " + product.getId().toString());
- return product;
- }
- /**
- * Upload content to a signed url.
- *
- * @param path content path.
- * @param content content to upload.
- * @param signedUrl url where content should be uploaded.
- * @return HTTP result
- * @throws Exception Exception
- */
- protected HttpResponse uploadContent(final String path, final Content content, final URL signedUrl)
- throws Exception {
- final long start = new Date().getTime();
- final HttpURLConnection connection = (HttpURLConnection) signedUrl.openConnection();
- connection.setDoOutput(true);
- connection.setConnectTimeout(connectTimeout);
- connection.setReadTimeout(readTimeout);
- // these values are part of signed url and are required
- connection.setRequestMethod("PUT");
- connection.addRequestProperty("Content-Length", content.getLength().toString());
- connection.addRequestProperty("Content-Type", content.getContentType());
- connection.addRequestProperty(
- "x-amz-meta-modified", XmlUtils.formatDate(content.getLastModified()));
- connection.addRequestProperty("x-amz-meta-sha256", content.getSha256());
- // send content
- try (final InputStream in = content.getInputStream();
- final OutputStream out = connection.getOutputStream()) {
- StreamUtils.transferStream(in, out);
- }
- final HttpResponse result = new HttpResponse(connection);
- final long elapsed = (new Date().getTime() - start);
- if (connection.getResponseCode() != 200) {
- throw new HttpException(result, "Error uploading content "
- + path + " (" + elapsed + " ms)");
- }
- LOGGER.finer(
- "["
- + getName()
- + "] uploaded content " + path + " (size= "
- + content.getLength()
- + " bytes) (time= "
- + elapsed
- + " ms)");
- return result;
- }
- /**
- * Upload product contents.
- *
- * Runs uploads in parallel using a parallel stream.
- *
- * This can be called within a custom ForkJoinPool to use a non-default pool,
- * the default pool is shared by the process and based on number of available
- * cores.
- *
- * @param product product to upload.
- * @param uploadProduct product with signed upload urls.
- * @return upload results
- * @throws Exception if any upload errors occur
- */
- protected Map<String, HttpResponse> uploadContents(
- final Product product, final Product uploadProduct) throws Exception {
- // collect results
- final ConcurrentHashMap<String, HttpResponse> uploadResults =
- new ConcurrentHashMap<String, HttpResponse>();
- final ConcurrentHashMap<String, Exception> uploadExceptions =
- new ConcurrentHashMap<String, Exception>();
- // upload contents in parallel
- uploadProduct.getContents().keySet().parallelStream()
- .filter(path -> !"".equals(path))
- .forEach(
- path -> {
- try {
- Content uploadContent = uploadProduct.getContents().get(path);
- if (!(uploadContent instanceof URLContent)) {
- throw new IllegalStateException(
- "Expected URLContent for " + product.getId().toString()
- + " path '" + path + "' but got " + uploadContent);
- }
- uploadResults.put(
- path,
- uploadContent(
- path,
- product.getContents().get(path),
- ((URLContent) uploadContent).getURL()));
- } catch (Exception e) {
- uploadExceptions.put(path, e);
- }
- });
- if (uploadExceptions.size() > 0) {
- Exception e = null;
- // log all
- for (final String path : uploadExceptions.keySet()) {
- e = uploadExceptions.get(path);
- LOGGER.log(Level.WARNING, "Exception uploading content " + path, e);
- }
- // throw last
- throw e;
- }
- return uploadResults;
- }
- /** Getter for signProducts
- * @return boolean
- */
- public boolean getSignProducts() {
- return signProducts;
- }
- /** Setter for signProducts
- * @param sign boolean
- */
- public void setSignProducts(final boolean sign) {
- this.signProducts = sign;
- }
- /** getter for privateKey
- * @return privateKey
- */
- public PrivateKey getPrivateKey() {
- return privateKey;
- }
- /** setting for privateKey
- * @param key PrivateKey
- */
- public void setPrivateKey(final PrivateKey key) {
- this.privateKey = key;
- }
- }