JsonProductStorage.java

package gov.usgs.earthquake.aws;

import java.io.ByteArrayInputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.json.Json;

import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
import gov.usgs.earthquake.distribution.ProductStorage;
import gov.usgs.earthquake.distribution.StorageEvent;
import gov.usgs.earthquake.distribution.StorageListener;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;

/**
 * Store Products in a database.
 *
 * Note that this storage does not store Product Content, and is intended for
 * Products that use URLContent and can be serialized using JsonProduct.
 *
 * Only SQLITE or local development should rely on createSchema.
 * Products (data column) have exceeded 64kb, plan accordingly.
 *
 * Mysql Schema Example:<br>
 * <pre>
 * CREATE TABLE IF NOT EXISTS indexer_storage
 * (id INTEGER PRIMARY KEY AUTO_INCREMENT
 * , source VARCHAR(255)
 * , type VARCHAR(255)
 * , code VARCHAR(255)
 * , updatetime BIGINT
 * , data LONGTEXT
 * , UNIQUE KEY product_index (source, type, code, updatetime)
 * ) ENGINE=innodb CHARSET=utf8;
 * </pre>
 */
public class JsonProductStorage extends JDBCConnection implements ProductStorage {

  private static final Logger LOGGER = Logger.getLogger(
      JsonProductStorage.class.getName());

  /** Variable for the default driver */
  public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
  /** Variable for the default table */
  public static final String DEFAULT_TABLE = "product";
  /** Variable for the default URL */
  public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db";

  /** Database table name. */
  private String table;

  /**
   * Create a JsonProductStorage using defaults.
   */
  public JsonProductStorage() {
    this(DEFAULT_DRIVER, DEFAULT_URL);
  }

  /**
   * Create a JsonProductStorage with a default table.
   * @param driver Driver to use
   * @param url URL to use
   */
  public JsonProductStorage(final String driver, final String url) {
    this(driver, url, DEFAULT_TABLE);
  }

  /**
   * Create a JsonProductStorage with a custom driver, url, and table.
   * @param driver Driver to use
   * @param url URL to use
   * @param table Table to use
   */
  public JsonProductStorage(
      final String driver, final String url, final String table) {
    super(driver, url);
    this.table = table;
  }

  /** @return table */
  public String getTable() { return this.table; }
  /** @param table Table to set */
  public void setTable(final String table) { this.table = table; }

  @Override
  public void configure(final Config config) throws Exception {
    super.configure(config);
    if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
    if (getUrl() == null) { setUrl(DEFAULT_URL); }

    setTable(config.getProperty("table", DEFAULT_TABLE));
    LOGGER.config("[" + getName() + "] driver=" + getDriver());
    LOGGER.config("[" + getName() + "] table=" + getTable());
    // do not log url, it may contain user/pass
  }

  /**
   * After normal startup, check whether schema exists and attempt to create.
   * @throws Exception if error occurs
   */
  @Override
  public void startup() throws Exception {
    super.startup();
    // make sure schema exists
    if (!schemaExists()) {
      LOGGER.warning("[" + getName() + "] schema not found, creating");
      createSchema();
    }
  }

  /**
   * Check whether schema exists.
   *
   * @return boolean
   * @throws Exception if error occurs
   */
  public boolean schemaExists() throws Exception {
    final String sql = "select * from " + this.table + " limit 1";
    beginTransaction();
    try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
      test.setQueryTimeout(60);
      // should throw exception if table does not exist
      try (final ResultSet rs = test.executeQuery()) {
        rs.next();
      }
      commitTransaction();
      // schema exists
      return true;
    } catch (Exception e) {
      rollbackTransaction();
      return false;
    }
  }

  /**
   * Attempt to create schema.
   *
   * Only supports sqlite or mysql.  When not using sqlite, relying on this
   * method is only recommended for local development.
   *
   * @throws Exception if error occurs
   */
  public void createSchema() throws Exception {
    // create schema
    beginTransaction();
    try (final Statement statement = getConnection().createStatement()) {
      String autoIncrement = "";
      String engine = "";
      if (getDriver().contains("mysql")) {
        autoIncrement = " AUTO_INCREMENT";
        engine = " ENGINE=innodb CHARSET=utf8";
      }
      statement.executeUpdate(
          "CREATE TABLE " + this.table
          + " (id INTEGER PRIMARY KEY" + autoIncrement
          + ", source VARCHAR(255)"
          + ", type VARCHAR(255)"
          + ", code VARCHAR(255)"
          + ", updatetime BIGINT"
          + ", data TEXT"
          + ")" + engine);
      statement.executeUpdate(
          "CREATE UNIQUE INDEX product_index ON " + this.table
          + " (source, type, code, updatetime)");
      commitTransaction();
    } catch (Exception e) {
      rollbackTransaction();
      throw e;
    }
  }

  /**
   * Check whether product found in storage.
   */
  @Override
  public boolean hasProduct(ProductId id) throws Exception {
    return getProduct(id) != null;
  }

  /**
   * Get a product from storage.
   *
   * @param id
   *     The product to get.
   * @return product if found, otherwise null.
   */
  @Override
  public synchronized Product getProduct(ProductId id) throws Exception {
    Product product = null;
    final String sql = "SELECT * FROM " + this.table
        + " WHERE source=? AND type=? AND code=? AND updatetime=?";
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      statement.setQueryTimeout(60);
      // set parameters
      statement.setString(1, id.getSource());
      statement.setString(2, id.getType());
      statement.setString(3, id.getCode());
      statement.setLong(4, id.getUpdateTime().getTime());

      // execute
      try (final ResultSet rs = statement.executeQuery()) {
        if (rs.next()) {
          // found product
          final String data = rs.getString("data");
          product = new JsonProduct().getProduct(
              Json.createReader(
                new ByteArrayInputStream(data.getBytes())
              ).readObject());
        }
      }
      commitTransaction();
    } catch (SQLException e) {
      try {
        // otherwise roll back
        rollbackTransaction();
      } catch (SQLException e2) {
        // ignore
      }
      LOGGER.log(
          Level.INFO,
          "[" + getName() + "] exception in getProduct("
              + id.toString() + ")",
          e);
    }
    return product;
  }

  /**
   * Add product to storage.
   *
   * @throws ProductAlreadyInStorageException
   *     if product already in storage.
   */
  @Override
  public synchronized ProductId storeProduct(Product product) throws Exception {
    // prepare statement
    beginTransaction();
    try (
      final PreparedStatement statement = getConnection().prepareStatement(
          "INSERT INTO " + this.table
          + " (source, type, code, updatetime, data)"
          + " VALUES (?, ?, ?, ?, ?)")
    ) {
      statement.setQueryTimeout(60);
      final ProductId id = product.getId();
      // set parameters
      statement.setString(1, id.getSource());
      statement.setString(2, id.getType());
      statement.setString(3, id.getCode());
      statement.setLong(4, id.getUpdateTime().getTime());
      statement.setString(5,
          product != null
          ? new JsonProduct().getJsonObject(product).toString()
          : "");
      // execute
      statement.executeUpdate();
      commitTransaction();
      return id;
    } catch (SQLException e) {
      try {
        // otherwise roll back
        rollbackTransaction();
      } catch (SQLException e2) {
        // ignore
      }
      if (e.toString().contains("Duplicate entry")) {
        throw new ProductAlreadyInStorageException(e.toString());
      }
      throw e;
    }
  }

  /**
   * Get a ProductSource for product in database.
   *
   * @return ObjectProductSource or null if product not found.
   */
  @Override
  public ProductSource getProductSource(ProductId id) throws Exception {
    final Product product = getProduct(id);
    if (product == null) {
      return null;
    }
    return new ObjectProductSource(product);
  }

  /**
   * Store a ProductSource.
   *
   * Uses ObjectProductHandler to read Product, then calls storeProduct.
   *
   * @throws ProductAlreadyInStorageException
   *     if product already in storage.
   */
  @Override
  public ProductId storeProductSource(ProductSource input) throws Exception {
    final ObjectProductHandler handler = new ObjectProductHandler();
    input.streamTo(handler);
    return storeProduct(handler.getProduct());
  }

  /**
   * Remove product from storage.
   */
  @Override
  public synchronized void removeProduct(ProductId id) throws Exception {
    // prepare statement
    final String sql = "DELETE FROM " + this.table
          + " WHERE source=? AND type=? AND code=? AND updatetime=?";
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      statement.setQueryTimeout(60);
      // set parameters
      statement.setString(1, id.getSource());
      statement.setString(2, id.getType());
      statement.setString(3, id.getCode());
      statement.setLong(4, id.getUpdateTime().getTime());
      // execute
      statement.executeUpdate();
      commitTransaction();
    } catch (SQLException e) {
      try {
        // otherwise roll back
        rollbackTransaction();
      } catch (SQLException e2) {
        // ignore
      }
      throw e;
    }
  }

  @Override
  public void notifyListeners(StorageEvent event) {
    // listeners not supported
  }

  @Override
  public void addStorageListener(StorageListener listener) {
    // listeners not supported
  }

  @Override
  public void removeStorageListener(StorageListener listener) {
    // listeners not supported
  }

}