AwsBatchIndexer.java

package gov.usgs.earthquake.aws;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;

import gov.usgs.earthquake.distribution.Bootstrappable;
import gov.usgs.earthquake.indexer.Indexer;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;

/**
 * Class to index a batch of products that have already been sent to the AWS hub.
 *
 * Reads a list of products to be indexed.
 * Reads indexer from configuration file.
 *
 * For each product, fetch product information from the get_product AWS endpoint
 * and call indexer.onProduct.
 */
public class AwsBatchIndexer implements Bootstrappable {
  /** Force reindex argument */
  public static final String FORCE_REINDEX_ARGUMENT = "--force";
  /** Get product URL argument */
  public static final String GET_PRODUCT_URL_ARGUMENT = "--getProductUrl=";
  /** Argument for indexer configuration name */
  public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName=";
  /** Default indexer configuration name */
  public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer";

  /** Argument for database driver */
  public static final String DATABASE_DRIVER_ARGUMENT = "--databaseDriver=";
  /** Argument for database URL */
  public static final String DATABASE_URL_ARGUMENT = "--databaseUrl=";
  /** Argument for indexer database */
  public static final String INDEXER_DATABASE_ARGUMENT = "--indexerDatabase=";
  /** Default database for indexer */
  public static final String INDEXER_DATABASE_DEFAULT = "indexer";

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(AwsBatchIndexer.class.getName());

  /** Executor where indexing runs. */
  private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

  /** Whether to force indexing. */
  private boolean force = false;

  /** AWS URL for get_product endpoint, with placeholders. */
  private String getProductUrlTemplate = "https://earthquake.usgs.gov/pdl/west"
      + "/get_product?source={source}&type={type}&code={code}&updateTime={updateTime}";

  /** Indexer to process products. */
  private Indexer indexer;


  @Override
  public void run(String[] args) throws Exception {
    // parse arguments
    String databaseDriver = "com.mysql.jdbc.Driver";
    String databaseUrl = null;
    String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT;
    for (final String arg : args) {
      if (arg.startsWith(DATABASE_DRIVER_ARGUMENT)) {
        databaseDriver = arg.replace(DATABASE_DRIVER_ARGUMENT, "");
      } else if (arg.startsWith(DATABASE_URL_ARGUMENT)) {
        databaseUrl = arg.replace(DATABASE_URL_ARGUMENT, "");
      } else if (arg.equals(FORCE_REINDEX_ARGUMENT)) {
        force = true;
      } else if (arg.startsWith(GET_PRODUCT_URL_ARGUMENT)) {
        getProductUrlTemplate = arg.replace(GET_PRODUCT_URL_ARGUMENT, "");
      } else if (arg.startsWith(INDEXER_CONFIG_NAME_ARGUMENT)) {
        indexerConfigName = arg.replace(INDEXER_CONFIG_NAME_ARGUMENT, "");
      }
    }

    // load indexer from configuration
    indexer = (Indexer) Config.getConfig().getObject(indexerConfigName);
    indexer.startup();

    try {
      if (databaseUrl != null) {
        LOGGER.info("Reading product ids from database");
        readProductIdsFromDatabase(databaseDriver, databaseUrl);
      } else {
        LOGGER.info("Reading product ids from stdin");
        readProductIdsFromStdin();
      }
    } finally {
      indexer.shutdown();
    }
  }

  /**
   * Use getProductUrl template to generate URL.
   *
   * Replace "{source}", "{type}", "{code}", and "{updateTime}" placeholders.
   *
   * @param id
   *     which product.
   * @return URL with placeholders replaced.
   * @throws Exception Exception
   */
  public URL getProductUrl(final ProductId id) throws Exception {
    String url = getProductUrlTemplate;
    url = url.replace("{source}", id.getSource());
    url = url.replace("{type}", id.getType());
    url = url.replace("{code}", id.getCode());
    url = url.replace("{updateTime}", XmlUtils.formatDate(id.getUpdateTime()));
    return new URL(url);
  }

  /**
   * Get Product from endpoint.
   *
   * @param id
   *     which product.
   * @return Product object.
   * @throws Exception Exception
   */
  public Product getProduct(final ProductId id) throws Exception {
    final URL url = getProductUrl(id);
    byte[] bytes = StreamUtils.readStream(url);
    try (
        final JsonReader reader = Json.createReader(new StringReader(
            new String(bytes, StandardCharsets.UTF_8)))
    ) {
      // parse message
      final JsonObject json = reader.readObject();
      final JsonNotification notification = new JsonNotification(json);
      return notification.product;
    }
  }

  /**
   * Fetch and Index a product.
   *
   * Called from executor service to process product ids.
   *
   * @param id
   *     which product
   */
  public void processProductId(final ProductId id) {
    long start = new Date().getTime();
    try {
      final Product product = getProduct(id);
      long afterGetProduct = new Date().getTime();
      LOGGER.fine("Loaded product " + id.toString() + " in "
          + (afterGetProduct - start) + " ms");

      indexer.onProduct(product, force);
      LOGGER.info("Indexed " + id.toString()
          + " in " + (new Date().getTime() - afterGetProduct) + " ms");
    } catch (Exception e) {
      LOGGER.log(
          Level.WARNING,
          "Error indexing " + id.toString()
              + " in " + (new Date().getTime() - start) + "ms",
          e);
    }
  }

  /**
   * Read product ids (as urns) from database and submit to executor for processing.
   *
   * @param driver database driver
   * @param url database url
   *
   * @throws Exception exception
   */
  public void readProductIdsFromDatabase(
      final String driver,
      final String url) throws Exception {
    try (
      final JDBCConnection jdbcConnection = new JDBCConnection()
    ) {
      jdbcConnection.setDriver(driver);
      jdbcConnection.setUrl(url);
      jdbcConnection.startup();

      final String sql = "SELECT id, source, type, code, updatetime"
          + " FROM pdl.product h"
          + " WHERE id > ?"
          + " AND NOT EXISTS ("
          + "  SELECT * FROM indexer.productSummary i"
          + "  WHERE h.source=i.source"
          + "  AND h.type=i.type"
          + "  AND h.code=i.code"
          + "  AND h.updatetime=i.updateTime"
          + " )"
          + " ORDER BY id"
          + " LIMIT 500";

      // start at the beginning
      long lastId = -1;
      while (true) {
        try (
          final Connection conn = jdbcConnection.verifyConnection();
          final PreparedStatement statement = conn.prepareStatement(sql);
        ) {
          // load next batch of products
          statement.setLong(1, lastId);
          try (
            final ResultSet rs = statement.executeQuery();
          ) {
            int count = 0;
            while (rs.next()) {
              lastId = rs.getLong("id");
              final ProductId id = new ProductId(
                  rs.getString("source"),
                  rs.getString("type"),
                  rs.getString("code"),
                  new Date(rs.getLong("updatetime")));
              submitProductId(id);
              count++;
            }

            // exit once all products processed
            if (count == 0) {
              LOGGER.info("No more rows returned, exiting");
              break;
            }
          }
        }
      }
    }
  }

  /**
   * Read product ids (as urns) from stdin and submit to executor for processing.
   *
   * @throws Exception Exception
   */
  public void readProductIdsFromStdin() throws Exception {
    // read product ids from stdin
    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    String line = null;
    while ((line = br.readLine()) != null) {
      if (line.equals("")) {
        continue;
      }
      // parse product id
      final ProductId id;
      try {
        id = ProductId.parse(line);
      } catch (Exception e) {
        LOGGER.warning("Error parsing product id '" + line + "'");
        continue;
      }
      submitProductId(id);
    }
  }

  /**
   * Submit a product id to the executor service for processing.
   *
   * If queue is too large (500 ids), blocks until queue is smaller (100 ids).
   *
   * @param id
   *     which product
   * @throws InterruptedException InterruptedException
   */
  public void submitProductId(final ProductId id) throws InterruptedException {
    // queue for processing
    executor.submit(() -> processProductId(id));

    // keep queue size smallish
    if (executor.getQueue().size() > 500) {
      while (executor.getQueue().size() > 100) {
        LOGGER.info("Queue size " + executor.getQueue().size());
        Thread.sleep(5000L);
      }
    }
  }
}