AwsBatchIndexer.java

  1. package gov.usgs.earthquake.aws;

  2. import java.io.BufferedReader;
  3. import java.io.InputStreamReader;
  4. import java.io.StringReader;
  5. import java.net.URL;
  6. import java.nio.charset.StandardCharsets;
  7. import java.sql.Connection;
  8. import java.sql.PreparedStatement;
  9. import java.sql.ResultSet;
  10. import java.util.Date;
  11. import java.util.concurrent.Executors;
  12. import java.util.concurrent.ThreadPoolExecutor;
  13. import java.util.logging.Level;
  14. import java.util.logging.Logger;

  15. import javax.json.Json;
  16. import javax.json.JsonObject;
  17. import javax.json.JsonReader;

  18. import gov.usgs.earthquake.distribution.Bootstrappable;
  19. import gov.usgs.earthquake.indexer.Indexer;
  20. import gov.usgs.earthquake.product.Product;
  21. import gov.usgs.earthquake.product.ProductId;
  22. import gov.usgs.earthquake.util.JDBCConnection;
  23. import gov.usgs.util.Config;
  24. import gov.usgs.util.StreamUtils;
  25. import gov.usgs.util.XmlUtils;

  26. /**
  27.  * Class to index a batch of products that have already been sent to the AWS hub.
  28.  *
  29.  * Reads a list of products to be indexed.
  30.  * Reads indexer from configuration file.
  31.  *
  32.  * For each product, fetch product information from the get_product AWS endpoint
  33.  * and call indexer.onProduct.
  34.  */
  35. public class AwsBatchIndexer implements Bootstrappable {
  36.   /** Force reindex argument */
  37.   public static final String FORCE_REINDEX_ARGUMENT = "--force";
  38.   /** Get product URL argument */
  39.   public static final String GET_PRODUCT_URL_ARGUMENT = "--getProductUrl=";
  40.   /** Argument for indexer configuration name */
  41.   public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName=";
  42.   /** Default indexer configuration name */
  43.   public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer";

  44.   /** Argument for database driver */
  45.   public static final String DATABASE_DRIVER_ARGUMENT = "--databaseDriver=";
  46.   /** Argument for database URL */
  47.   public static final String DATABASE_URL_ARGUMENT = "--databaseUrl=";
  48.   /** Argument for indexer database */
  49.   public static final String INDEXER_DATABASE_ARGUMENT = "--indexerDatabase=";
  50.   /** Default database for indexer */
  51.   public static final String INDEXER_DATABASE_DEFAULT = "indexer";

  52.   /** Logging object. */
  53.   private static final Logger LOGGER = Logger.getLogger(AwsBatchIndexer.class.getName());

  54.   /** Executor where indexing runs. */
  55.   private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

  56.   /** Whether to force indexing. */
  57.   private boolean force = false;

  58.   /** AWS URL for get_product endpoint, with placeholders. */
  59.   private String getProductUrlTemplate = "https://earthquake.usgs.gov/pdl/west"
  60.       + "/get_product?source={source}&type={type}&code={code}&updateTime={updateTime}";

  61.   /** Indexer to process products. */
  62.   private Indexer indexer;


  63.   @Override
  64.   public void run(String[] args) throws Exception {
  65.     // parse arguments
  66.     String databaseDriver = "com.mysql.jdbc.Driver";
  67.     String databaseUrl = null;
  68.     String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT;
  69.     for (final String arg : args) {
  70.       if (arg.startsWith(DATABASE_DRIVER_ARGUMENT)) {
  71.         databaseDriver = arg.replace(DATABASE_DRIVER_ARGUMENT, "");
  72.       } else if (arg.startsWith(DATABASE_URL_ARGUMENT)) {
  73.         databaseUrl = arg.replace(DATABASE_URL_ARGUMENT, "");
  74.       } else if (arg.equals(FORCE_REINDEX_ARGUMENT)) {
  75.         force = true;
  76.       } else if (arg.startsWith(GET_PRODUCT_URL_ARGUMENT)) {
  77.         getProductUrlTemplate = arg.replace(GET_PRODUCT_URL_ARGUMENT, "");
  78.       } else if (arg.startsWith(INDEXER_CONFIG_NAME_ARGUMENT)) {
  79.         indexerConfigName = arg.replace(INDEXER_CONFIG_NAME_ARGUMENT, "");
  80.       }
  81.     }

  82.     // load indexer from configuration
  83.     indexer = (Indexer) Config.getConfig().getObject(indexerConfigName);
  84.     indexer.startup();

  85.     try {
  86.       if (databaseUrl != null) {
  87.         LOGGER.info("Reading product ids from database");
  88.         readProductIdsFromDatabase(databaseDriver, databaseUrl);
  89.       } else {
  90.         LOGGER.info("Reading product ids from stdin");
  91.         readProductIdsFromStdin();
  92.       }
  93.     } finally {
  94.       indexer.shutdown();
  95.     }
  96.   }

  97.   /**
  98.    * Use getProductUrl template to generate URL.
  99.    *
  100.    * Replace "{source}", "{type}", "{code}", and "{updateTime}" placeholders.
  101.    *
  102.    * @param id
  103.    *     which product.
  104.    * @return URL with placeholders replaced.
  105.    * @throws Exception Exception
  106.    */
  107.   public URL getProductUrl(final ProductId id) throws Exception {
  108.     String url = getProductUrlTemplate;
  109.     url = url.replace("{source}", id.getSource());
  110.     url = url.replace("{type}", id.getType());
  111.     url = url.replace("{code}", id.getCode());
  112.     url = url.replace("{updateTime}", XmlUtils.formatDate(id.getUpdateTime()));
  113.     return new URL(url);
  114.   }

  115.   /**
  116.    * Get Product from endpoint.
  117.    *
  118.    * @param id
  119.    *     which product.
  120.    * @return Product object.
  121.    * @throws Exception Exception
  122.    */
  123.   public Product getProduct(final ProductId id) throws Exception {
  124.     final URL url = getProductUrl(id);
  125.     byte[] bytes = StreamUtils.readStream(url);
  126.     try (
  127.         final JsonReader reader = Json.createReader(new StringReader(
  128.             new String(bytes, StandardCharsets.UTF_8)))
  129.     ) {
  130.       // parse message
  131.       final JsonObject json = reader.readObject();
  132.       final JsonNotification notification = new JsonNotification(json);
  133.       return notification.product;
  134.     }
  135.   }

  136.   /**
  137.    * Fetch and Index a product.
  138.    *
  139.    * Called from executor service to process product ids.
  140.    *
  141.    * @param id
  142.    *     which product
  143.    */
  144.   public void processProductId(final ProductId id) {
  145.     long start = new Date().getTime();
  146.     try {
  147.       final Product product = getProduct(id);
  148.       long afterGetProduct = new Date().getTime();
  149.       LOGGER.fine("Loaded product " + id.toString() + " in "
  150.           + (afterGetProduct - start) + " ms");

  151.       indexer.onProduct(product, force);
  152.       LOGGER.info("Indexed " + id.toString()
  153.           + " in " + (new Date().getTime() - afterGetProduct) + " ms");
  154.     } catch (Exception e) {
  155.       LOGGER.log(
  156.           Level.WARNING,
  157.           "Error indexing " + id.toString()
  158.               + " in " + (new Date().getTime() - start) + "ms",
  159.           e);
  160.     }
  161.   }

  162.   /**
  163.    * Read product ids (as urns) from database and submit to executor for processing.
  164.    *
  165.    * @param driver database driver
  166.    * @param url database url
  167.    *
  168.    * @throws Exception exception
  169.    */
  170.   public void readProductIdsFromDatabase(
  171.       final String driver,
  172.       final String url) throws Exception {
  173.     try (
  174.       final JDBCConnection jdbcConnection = new JDBCConnection()
  175.     ) {
  176.       jdbcConnection.setDriver(driver);
  177.       jdbcConnection.setUrl(url);
  178.       jdbcConnection.startup();

  179.       final String sql = "SELECT id, source, type, code, updatetime"
  180.           + " FROM pdl.product h"
  181.           + " WHERE id > ?"
  182.           + " AND NOT EXISTS ("
  183.           + "  SELECT * FROM indexer.productSummary i"
  184.           + "  WHERE h.source=i.source"
  185.           + "  AND h.type=i.type"
  186.           + "  AND h.code=i.code"
  187.           + "  AND h.updatetime=i.updateTime"
  188.           + " )"
  189.           + " ORDER BY id"
  190.           + " LIMIT 500";

  191.       // start at the beginning
  192.       long lastId = -1;
  193.       while (true) {
  194.         try (
  195.           final Connection conn = jdbcConnection.verifyConnection();
  196.           final PreparedStatement statement = conn.prepareStatement(sql);
  197.         ) {
  198.           // load next batch of products
  199.           statement.setLong(1, lastId);
  200.           try (
  201.             final ResultSet rs = statement.executeQuery();
  202.           ) {
  203.             int count = 0;
  204.             while (rs.next()) {
  205.               lastId = rs.getLong("id");
  206.               final ProductId id = new ProductId(
  207.                   rs.getString("source"),
  208.                   rs.getString("type"),
  209.                   rs.getString("code"),
  210.                   new Date(rs.getLong("updatetime")));
  211.               submitProductId(id);
  212.               count++;
  213.             }

  214.             // exit once all products processed
  215.             if (count == 0) {
  216.               LOGGER.info("No more rows returned, exiting");
  217.               break;
  218.             }
  219.           }
  220.         }
  221.       }
  222.     }
  223.   }

  224.   /**
  225.    * Read product ids (as urns) from stdin and submit to executor for processing.
  226.    *
  227.    * @throws Exception Exception
  228.    */
  229.   public void readProductIdsFromStdin() throws Exception {
  230.     // read product ids from stdin
  231.     BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
  232.     String line = null;
  233.     while ((line = br.readLine()) != null) {
  234.       if (line.equals("")) {
  235.         continue;
  236.       }
  237.       // parse product id
  238.       final ProductId id;
  239.       try {
  240.         id = ProductId.parse(line);
  241.       } catch (Exception e) {
  242.         LOGGER.warning("Error parsing product id '" + line + "'");
  243.         continue;
  244.       }
  245.       submitProductId(id);
  246.     }
  247.   }

  248.   /**
  249.    * Submit a product id to the executor service for processing.
  250.    *
  251.    * If queue is too large (500 ids), blocks until queue is smaller (100 ids).
  252.    *
  253.    * @param id
  254.    *     which product
  255.    * @throws InterruptedException InterruptedException
  256.    */
  257.   public void submitProductId(final ProductId id) throws InterruptedException {
  258.     // queue for processing
  259.     executor.submit(() -> processProductId(id));

  260.     // keep queue size smallish
  261.     if (executor.getQueue().size() > 500) {
  262.       while (executor.getQueue().size() > 100) {
  263.         LOGGER.info("Queue size " + executor.getQueue().size());
  264.         Thread.sleep(5000L);
  265.       }
  266.     }
  267.   }
  268. }