JsonProductStorage.java

  1. package gov.usgs.earthquake.aws;

  2. import java.io.ByteArrayInputStream;
  3. import java.sql.PreparedStatement;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. import java.util.logging.Level;
  8. import java.util.logging.Logger;

  9. import javax.json.Json;

  10. import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
  11. import gov.usgs.earthquake.distribution.ProductStorage;
  12. import gov.usgs.earthquake.distribution.StorageEvent;
  13. import gov.usgs.earthquake.distribution.StorageListener;
  14. import gov.usgs.earthquake.product.Product;
  15. import gov.usgs.earthquake.product.ProductId;
  16. import gov.usgs.earthquake.product.io.JsonProduct;
  17. import gov.usgs.earthquake.product.io.ObjectProductHandler;
  18. import gov.usgs.earthquake.product.io.ObjectProductSource;
  19. import gov.usgs.earthquake.product.io.ProductSource;
  20. import gov.usgs.earthquake.util.JDBCConnection;
  21. import gov.usgs.util.Config;

  22. /**
  23.  * Store Products in a database.
  24.  *
  25.  * Note that this storage does not store Product Content, and is intended for
  26.  * Products that use URLContent and can be serialized using JsonProduct.
  27.  *
  28.  * Only SQLITE or local development should rely on createSchema.
  29.  * Products (data column) have exceeded 64kb, plan accordingly.
  30.  *
  31.  * Mysql Schema Example:<br>
  32.  * <pre>
  33.  * CREATE TABLE IF NOT EXISTS indexer_storage
  34.  * (id INTEGER PRIMARY KEY AUTO_INCREMENT
  35.  * , source VARCHAR(255)
  36.  * , type VARCHAR(255)
  37.  * , code VARCHAR(255)
  38.  * , updatetime BIGINT
  39.  * , data LONGTEXT
  40.  * , UNIQUE KEY product_index (source, type, code, updatetime)
  41.  * ) ENGINE=innodb CHARSET=utf8;
  42.  * </pre>
  43.  */
  44. public class JsonProductStorage extends JDBCConnection implements ProductStorage {

  45.   private static final Logger LOGGER = Logger.getLogger(
  46.       JsonProductStorage.class.getName());

  47.   /** Variable for the default driver */
  48.   public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
  49.   /** Variable for the default table */
  50.   public static final String DEFAULT_TABLE = "product";
  51.   /** Variable for the default URL */
  52.   public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db";

  53.   /** Database table name. */
  54.   private String table;

  55.   /**
  56.    * Create a JsonProductStorage using defaults.
  57.    */
  58.   public JsonProductStorage() {
  59.     this(DEFAULT_DRIVER, DEFAULT_URL);
  60.   }

  61.   /**
  62.    * Create a JsonProductStorage with a default table.
  63.    * @param driver Driver to use
  64.    * @param url URL to use
  65.    */
  66.   public JsonProductStorage(final String driver, final String url) {
  67.     this(driver, url, DEFAULT_TABLE);
  68.   }

  69.   /**
  70.    * Create a JsonProductStorage with a custom driver, url, and table.
  71.    * @param driver Driver to use
  72.    * @param url URL to use
  73.    * @param table Table to use
  74.    */
  75.   public JsonProductStorage(
  76.       final String driver, final String url, final String table) {
  77.     super(driver, url);
  78.     this.table = table;
  79.   }

  80.   /** @return table */
  81.   public String getTable() { return this.table; }
  82.   /** @param table Table to set */
  83.   public void setTable(final String table) { this.table = table; }

  84.   @Override
  85.   public void configure(final Config config) throws Exception {
  86.     super.configure(config);
  87.     if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
  88.     if (getUrl() == null) { setUrl(DEFAULT_URL); }

  89.     setTable(config.getProperty("table", DEFAULT_TABLE));
  90.     LOGGER.config("[" + getName() + "] driver=" + getDriver());
  91.     LOGGER.config("[" + getName() + "] table=" + getTable());
  92.     // do not log url, it may contain user/pass
  93.   }

  94.   /**
  95.    * After normal startup, check whether schema exists and attempt to create.
  96.    * @throws Exception if error occurs
  97.    */
  98.   @Override
  99.   public void startup() throws Exception {
  100.     super.startup();
  101.     // make sure schema exists
  102.     if (!schemaExists()) {
  103.       LOGGER.warning("[" + getName() + "] schema not found, creating");
  104.       createSchema();
  105.     }
  106.   }

  107.   /**
  108.    * Check whether schema exists.
  109.    *
  110.    * @return boolean
  111.    * @throws Exception if error occurs
  112.    */
  113.   public boolean schemaExists() throws Exception {
  114.     final String sql = "select * from " + this.table + " limit 1";
  115.     beginTransaction();
  116.     try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
  117.       test.setQueryTimeout(60);
  118.       // should throw exception if table does not exist
  119.       try (final ResultSet rs = test.executeQuery()) {
  120.         rs.next();
  121.       }
  122.       commitTransaction();
  123.       // schema exists
  124.       return true;
  125.     } catch (Exception e) {
  126.       rollbackTransaction();
  127.       return false;
  128.     }
  129.   }

  130.   /**
  131.    * Attempt to create schema.
  132.    *
  133.    * Only supports sqlite or mysql.  When not using sqlite, relying on this
  134.    * method is only recommended for local development.
  135.    *
  136.    * @throws Exception if error occurs
  137.    */
  138.   public void createSchema() throws Exception {
  139.     // create schema
  140.     beginTransaction();
  141.     try (final Statement statement = getConnection().createStatement()) {
  142.       String autoIncrement = "";
  143.       String engine = "";
  144.       if (getDriver().contains("mysql")) {
  145.         autoIncrement = " AUTO_INCREMENT";
  146.         engine = " ENGINE=innodb CHARSET=utf8";
  147.       }
  148.       statement.executeUpdate(
  149.           "CREATE TABLE " + this.table
  150.           + " (id INTEGER PRIMARY KEY" + autoIncrement
  151.           + ", source VARCHAR(255)"
  152.           + ", type VARCHAR(255)"
  153.           + ", code VARCHAR(255)"
  154.           + ", updatetime BIGINT"
  155.           + ", data TEXT"
  156.           + ")" + engine);
  157.       statement.executeUpdate(
  158.           "CREATE UNIQUE INDEX product_index ON " + this.table
  159.           + " (source, type, code, updatetime)");
  160.       commitTransaction();
  161.     } catch (Exception e) {
  162.       rollbackTransaction();
  163.       throw e;
  164.     }
  165.   }

  166.   /**
  167.    * Check whether product found in storage.
  168.    */
  169.   @Override
  170.   public boolean hasProduct(ProductId id) throws Exception {
  171.     return getProduct(id) != null;
  172.   }

  173.   /**
  174.    * Get a product from storage.
  175.    *
  176.    * @param id
  177.    *     The product to get.
  178.    * @return product if found, otherwise null.
  179.    */
  180.   @Override
  181.   public synchronized Product getProduct(ProductId id) throws Exception {
  182.     Product product = null;
  183.     final String sql = "SELECT * FROM " + this.table
  184.         + " WHERE source=? AND type=? AND code=? AND updatetime=?";
  185.     // prepare statement
  186.     beginTransaction();
  187.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  188.       statement.setQueryTimeout(60);
  189.       // set parameters
  190.       statement.setString(1, id.getSource());
  191.       statement.setString(2, id.getType());
  192.       statement.setString(3, id.getCode());
  193.       statement.setLong(4, id.getUpdateTime().getTime());

  194.       // execute
  195.       try (final ResultSet rs = statement.executeQuery()) {
  196.         if (rs.next()) {
  197.           // found product
  198.           final String data = rs.getString("data");
  199.           product = new JsonProduct().getProduct(
  200.               Json.createReader(
  201.                 new ByteArrayInputStream(data.getBytes())
  202.               ).readObject());
  203.         }
  204.       }
  205.       commitTransaction();
  206.     } catch (SQLException e) {
  207.       try {
  208.         // otherwise roll back
  209.         rollbackTransaction();
  210.       } catch (SQLException e2) {
  211.         // ignore
  212.       }
  213.       LOGGER.log(
  214.           Level.INFO,
  215.           "[" + getName() + "] exception in getProduct("
  216.               + id.toString() + ")",
  217.           e);
  218.     }
  219.     return product;
  220.   }

  221.   /**
  222.    * Add product to storage.
  223.    *
  224.    * @throws ProductAlreadyInStorageException
  225.    *     if product already in storage.
  226.    */
  227.   @Override
  228.   public synchronized ProductId storeProduct(Product product) throws Exception {
  229.     // prepare statement
  230.     beginTransaction();
  231.     try (
  232.       final PreparedStatement statement = getConnection().prepareStatement(
  233.           "INSERT INTO " + this.table
  234.           + " (source, type, code, updatetime, data)"
  235.           + " VALUES (?, ?, ?, ?, ?)")
  236.     ) {
  237.       statement.setQueryTimeout(60);
  238.       final ProductId id = product.getId();
  239.       // set parameters
  240.       statement.setString(1, id.getSource());
  241.       statement.setString(2, id.getType());
  242.       statement.setString(3, id.getCode());
  243.       statement.setLong(4, id.getUpdateTime().getTime());
  244.       statement.setString(5,
  245.           product != null
  246.           ? new JsonProduct().getJsonObject(product).toString()
  247.           : "");
  248.       // execute
  249.       statement.executeUpdate();
  250.       commitTransaction();
  251.       return id;
  252.     } catch (SQLException e) {
  253.       try {
  254.         // otherwise roll back
  255.         rollbackTransaction();
  256.       } catch (SQLException e2) {
  257.         // ignore
  258.       }
  259.       if (e.toString().contains("Duplicate entry")) {
  260.         throw new ProductAlreadyInStorageException(e.toString());
  261.       }
  262.       throw e;
  263.     }
  264.   }

  265.   /**
  266.    * Get a ProductSource for product in database.
  267.    *
  268.    * @return ObjectProductSource or null if product not found.
  269.    */
  270.   @Override
  271.   public ProductSource getProductSource(ProductId id) throws Exception {
  272.     final Product product = getProduct(id);
  273.     if (product == null) {
  274.       return null;
  275.     }
  276.     return new ObjectProductSource(product);
  277.   }

  278.   /**
  279.    * Store a ProductSource.
  280.    *
  281.    * Uses ObjectProductHandler to read Product, then calls storeProduct.
  282.    *
  283.    * @throws ProductAlreadyInStorageException
  284.    *     if product already in storage.
  285.    */
  286.   @Override
  287.   public ProductId storeProductSource(ProductSource input) throws Exception {
  288.     final ObjectProductHandler handler = new ObjectProductHandler();
  289.     input.streamTo(handler);
  290.     return storeProduct(handler.getProduct());
  291.   }

  292.   /**
  293.    * Remove product from storage.
  294.    */
  295.   @Override
  296.   public synchronized void removeProduct(ProductId id) throws Exception {
  297.     // prepare statement
  298.     final String sql = "DELETE FROM " + this.table
  299.           + " WHERE source=? AND type=? AND code=? AND updatetime=?";
  300.     beginTransaction();
  301.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  302.       statement.setQueryTimeout(60);
  303.       // set parameters
  304.       statement.setString(1, id.getSource());
  305.       statement.setString(2, id.getType());
  306.       statement.setString(3, id.getCode());
  307.       statement.setLong(4, id.getUpdateTime().getTime());
  308.       // execute
  309.       statement.executeUpdate();
  310.       commitTransaction();
  311.     } catch (SQLException e) {
  312.       try {
  313.         // otherwise roll back
  314.         rollbackTransaction();
  315.       } catch (SQLException e2) {
  316.         // ignore
  317.       }
  318.       throw e;
  319.     }
  320.   }

  321.   @Override
  322.   public void notifyListeners(StorageEvent event) {
  323.     // listeners not supported
  324.   }

  325.   @Override
  326.   public void addStorageListener(StorageListener listener) {
  327.     // listeners not supported
  328.   }

  329.   @Override
  330.   public void removeStorageListener(StorageListener listener) {
  331.     // listeners not supported
  332.   }

  333. }