AwsBatchIndexer.java
- package gov.usgs.earthquake.aws;
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
- import java.io.StringReader;
- import java.net.URL;
- import java.nio.charset.StandardCharsets;
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.util.Date;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import javax.json.Json;
- import javax.json.JsonObject;
- import javax.json.JsonReader;
- import gov.usgs.earthquake.distribution.Bootstrappable;
- import gov.usgs.earthquake.indexer.Indexer;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.earthquake.util.JDBCConnection;
- import gov.usgs.util.Config;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.XmlUtils;
- /**
- * Class to index a batch of products that have already been sent to the AWS hub.
- *
- * Reads a list of products to be indexed.
- * Reads indexer from configuration file.
- *
- * For each product, fetch product information from the get_product AWS endpoint
- * and call indexer.onProduct.
- */
- public class AwsBatchIndexer implements Bootstrappable {
- /** Force reindex argument */
- public static final String FORCE_REINDEX_ARGUMENT = "--force";
- /** Get product URL argument */
- public static final String GET_PRODUCT_URL_ARGUMENT = "--getProductUrl=";
- /** Argument for indexer configuration name */
- public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName=";
- /** Default indexer configuration name */
- public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer";
- /** Argument for database driver */
- public static final String DATABASE_DRIVER_ARGUMENT = "--databaseDriver=";
- /** Argument for database URL */
- public static final String DATABASE_URL_ARGUMENT = "--databaseUrl=";
- /** Argument for indexer database */
- public static final String INDEXER_DATABASE_ARGUMENT = "--indexerDatabase=";
- /** Default database for indexer */
- public static final String INDEXER_DATABASE_DEFAULT = "indexer";
- /** Logging object. */
- private static final Logger LOGGER = Logger.getLogger(AwsBatchIndexer.class.getName());
- /** Executor where indexing runs. */
- private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
- /** Whether to force indexing. */
- private boolean force = false;
- /** AWS URL for get_product endpoint, with placeholders. */
- private String getProductUrlTemplate = "https://earthquake.usgs.gov/pdl/west"
- + "/get_product?source={source}&type={type}&code={code}&updateTime={updateTime}";
- /** Indexer to process products. */
- private Indexer indexer;
- @Override
- public void run(String[] args) throws Exception {
- // parse arguments
- String databaseDriver = "com.mysql.jdbc.Driver";
- String databaseUrl = null;
- String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT;
- for (final String arg : args) {
- if (arg.startsWith(DATABASE_DRIVER_ARGUMENT)) {
- databaseDriver = arg.replace(DATABASE_DRIVER_ARGUMENT, "");
- } else if (arg.startsWith(DATABASE_URL_ARGUMENT)) {
- databaseUrl = arg.replace(DATABASE_URL_ARGUMENT, "");
- } else if (arg.equals(FORCE_REINDEX_ARGUMENT)) {
- force = true;
- } else if (arg.startsWith(GET_PRODUCT_URL_ARGUMENT)) {
- getProductUrlTemplate = arg.replace(GET_PRODUCT_URL_ARGUMENT, "");
- } else if (arg.startsWith(INDEXER_CONFIG_NAME_ARGUMENT)) {
- indexerConfigName = arg.replace(INDEXER_CONFIG_NAME_ARGUMENT, "");
- }
- }
- // load indexer from configuration
- indexer = (Indexer) Config.getConfig().getObject(indexerConfigName);
- indexer.startup();
- try {
- if (databaseUrl != null) {
- LOGGER.info("Reading product ids from database");
- readProductIdsFromDatabase(databaseDriver, databaseUrl);
- } else {
- LOGGER.info("Reading product ids from stdin");
- readProductIdsFromStdin();
- }
- } finally {
- indexer.shutdown();
- }
- }
- /**
- * Use getProductUrl template to generate URL.
- *
- * Replace "{source}", "{type}", "{code}", and "{updateTime}" placeholders.
- *
- * @param id
- * which product.
- * @return URL with placeholders replaced.
- * @throws Exception Exception
- */
- public URL getProductUrl(final ProductId id) throws Exception {
- String url = getProductUrlTemplate;
- url = url.replace("{source}", id.getSource());
- url = url.replace("{type}", id.getType());
- url = url.replace("{code}", id.getCode());
- url = url.replace("{updateTime}", XmlUtils.formatDate(id.getUpdateTime()));
- return new URL(url);
- }
- /**
- * Get Product from endpoint.
- *
- * @param id
- * which product.
- * @return Product object.
- * @throws Exception Exception
- */
- public Product getProduct(final ProductId id) throws Exception {
- final URL url = getProductUrl(id);
- byte[] bytes = StreamUtils.readStream(url);
- try (
- final JsonReader reader = Json.createReader(new StringReader(
- new String(bytes, StandardCharsets.UTF_8)))
- ) {
- // parse message
- final JsonObject json = reader.readObject();
- final JsonNotification notification = new JsonNotification(json);
- return notification.product;
- }
- }
- /**
- * Fetch and Index a product.
- *
- * Called from executor service to process product ids.
- *
- * @param id
- * which product
- */
- public void processProductId(final ProductId id) {
- long start = new Date().getTime();
- try {
- final Product product = getProduct(id);
- long afterGetProduct = new Date().getTime();
- LOGGER.fine("Loaded product " + id.toString() + " in "
- + (afterGetProduct - start) + " ms");
- indexer.onProduct(product, force);
- LOGGER.info("Indexed " + id.toString()
- + " in " + (new Date().getTime() - afterGetProduct) + " ms");
- } catch (Exception e) {
- LOGGER.log(
- Level.WARNING,
- "Error indexing " + id.toString()
- + " in " + (new Date().getTime() - start) + "ms",
- e);
- }
- }
- /**
- * Read product ids (as urns) from database and submit to executor for processing.
- *
- * @param driver database driver
- * @param url database url
- *
- * @throws Exception exception
- */
- public void readProductIdsFromDatabase(
- final String driver,
- final String url) throws Exception {
- try (
- final JDBCConnection jdbcConnection = new JDBCConnection()
- ) {
- jdbcConnection.setDriver(driver);
- jdbcConnection.setUrl(url);
- jdbcConnection.startup();
- final String sql = "SELECT id, source, type, code, updatetime"
- + " FROM pdl.product h"
- + " WHERE id > ?"
- + " AND NOT EXISTS ("
- + " SELECT * FROM indexer.productSummary i"
- + " WHERE h.source=i.source"
- + " AND h.type=i.type"
- + " AND h.code=i.code"
- + " AND h.updatetime=i.updateTime"
- + " )"
- + " ORDER BY id"
- + " LIMIT 500";
- // start at the beginning
- long lastId = -1;
- while (true) {
- try (
- final Connection conn = jdbcConnection.verifyConnection();
- final PreparedStatement statement = conn.prepareStatement(sql);
- ) {
- // load next batch of products
- statement.setLong(1, lastId);
- try (
- final ResultSet rs = statement.executeQuery();
- ) {
- int count = 0;
- while (rs.next()) {
- lastId = rs.getLong("id");
- final ProductId id = new ProductId(
- rs.getString("source"),
- rs.getString("type"),
- rs.getString("code"),
- new Date(rs.getLong("updatetime")));
- submitProductId(id);
- count++;
- }
- // exit once all products processed
- if (count == 0) {
- LOGGER.info("No more rows returned, exiting");
- break;
- }
- }
- }
- }
- }
- }
- /**
- * Read product ids (as urns) from stdin and submit to executor for processing.
- *
- * @throws Exception Exception
- */
- public void readProductIdsFromStdin() throws Exception {
- // read product ids from stdin
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
- String line = null;
- while ((line = br.readLine()) != null) {
- if (line.equals("")) {
- continue;
- }
- // parse product id
- final ProductId id;
- try {
- id = ProductId.parse(line);
- } catch (Exception e) {
- LOGGER.warning("Error parsing product id '" + line + "'");
- continue;
- }
- submitProductId(id);
- }
- }
- /**
- * Submit a product id to the executor service for processing.
- *
- * If queue is too large (500 ids), blocks until queue is smaller (100 ids).
- *
- * @param id
- * which product
- * @throws InterruptedException InterruptedException
- */
- public void submitProductId(final ProductId id) throws InterruptedException {
- // queue for processing
- executor.submit(() -> processProductId(id));
- // keep queue size smallish
- if (executor.getQueue().size() > 500) {
- while (executor.getQueue().size() > 100) {
- LOGGER.info("Queue size " + executor.getQueue().size());
- Thread.sleep(5000L);
- }
- }
- }
- }