JsonProductStorage.java
package gov.usgs.earthquake.aws;
import java.io.ByteArrayInputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.Json;
import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
import gov.usgs.earthquake.distribution.ProductStorage;
import gov.usgs.earthquake.distribution.StorageEvent;
import gov.usgs.earthquake.distribution.StorageListener;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;
/**
* Store Products in a database.
*
* Note that this storage does not store Product Content, and is intended for
* Products that use URLContent and can be serialized using JsonProduct.
*
* Only SQLITE or local development should rely on createSchema.
* Products (data column) have exceeded 64kb, plan accordingly.
*
* Mysql Schema Example:<br>
* <pre>
* CREATE TABLE IF NOT EXISTS indexer_storage
* (id INTEGER PRIMARY KEY AUTO_INCREMENT
* , source VARCHAR(255)
* , type VARCHAR(255)
* , code VARCHAR(255)
* , updatetime BIGINT
* , data LONGTEXT
* , UNIQUE KEY product_index (source, type, code, updatetime)
* ) ENGINE=innodb CHARSET=utf8;
* </pre>
*/
public class JsonProductStorage extends JDBCConnection implements ProductStorage {
private static final Logger LOGGER = Logger.getLogger(
JsonProductStorage.class.getName());
/** Variable for the default driver */
public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
/** Variable for the default table */
public static final String DEFAULT_TABLE = "product";
/** Variable for the default URL */
public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db";
/** Database table name. */
private String table;
/**
* Create a JsonProductStorage using defaults.
*/
public JsonProductStorage() {
this(DEFAULT_DRIVER, DEFAULT_URL);
}
/**
* Create a JsonProductStorage with a default table.
* @param driver Driver to use
* @param url URL to use
*/
public JsonProductStorage(final String driver, final String url) {
this(driver, url, DEFAULT_TABLE);
}
/**
* Create a JsonProductStorage with a custom driver, url, and table.
* @param driver Driver to use
* @param url URL to use
* @param table Table to use
*/
public JsonProductStorage(
final String driver, final String url, final String table) {
super(driver, url);
this.table = table;
}
/** @return table */
public String getTable() { return this.table; }
/** @param table Table to set */
public void setTable(final String table) { this.table = table; }
@Override
public void configure(final Config config) throws Exception {
super.configure(config);
if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
if (getUrl() == null) { setUrl(DEFAULT_URL); }
setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}
/**
* After normal startup, check whether schema exists and attempt to create.
* @throws Exception if error occurs
*/
@Override
public void startup() throws Exception {
super.startup();
// make sure schema exists
if (!schemaExists()) {
LOGGER.warning("[" + getName() + "] schema not found, creating");
createSchema();
}
}
/**
* Check whether schema exists.
*
* @return boolean
* @throws Exception if error occurs
*/
public boolean schemaExists() throws Exception {
final String sql = "select * from " + this.table + " limit 1";
beginTransaction();
try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
test.setQueryTimeout(60);
// should throw exception if table does not exist
try (final ResultSet rs = test.executeQuery()) {
rs.next();
}
commitTransaction();
// schema exists
return true;
} catch (Exception e) {
rollbackTransaction();
return false;
}
}
/**
* Attempt to create schema.
*
* Only supports sqlite or mysql. When not using sqlite, relying on this
* method is only recommended for local development.
*
* @throws Exception if error occurs
*/
public void createSchema() throws Exception {
// create schema
beginTransaction();
try (final Statement statement = getConnection().createStatement()) {
String autoIncrement = "";
String engine = "";
if (getDriver().contains("mysql")) {
autoIncrement = " AUTO_INCREMENT";
engine = " ENGINE=innodb CHARSET=utf8";
}
statement.executeUpdate(
"CREATE TABLE " + this.table
+ " (id INTEGER PRIMARY KEY" + autoIncrement
+ ", source VARCHAR(255)"
+ ", type VARCHAR(255)"
+ ", code VARCHAR(255)"
+ ", updatetime BIGINT"
+ ", data TEXT"
+ ")" + engine);
statement.executeUpdate(
"CREATE UNIQUE INDEX product_index ON " + this.table
+ " (source, type, code, updatetime)");
commitTransaction();
} catch (Exception e) {
rollbackTransaction();
throw e;
}
}
/**
* Check whether product found in storage.
*/
@Override
public boolean hasProduct(ProductId id) throws Exception {
return getProduct(id) != null;
}
/**
* Get a product from storage.
*
* @param id
* The product to get.
* @return product if found, otherwise null.
*/
@Override
public synchronized Product getProduct(ProductId id) throws Exception {
Product product = null;
final String sql = "SELECT * FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
// execute
try (final ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
// found product
final String data = rs.getString("data");
product = new JsonProduct().getProduct(
Json.createReader(
new ByteArrayInputStream(data.getBytes())
).readObject());
}
}
commitTransaction();
} catch (SQLException e) {
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
LOGGER.log(
Level.INFO,
"[" + getName() + "] exception in getProduct("
+ id.toString() + ")",
e);
}
return product;
}
/**
* Add product to storage.
*
* @throws ProductAlreadyInStorageException
* if product already in storage.
*/
@Override
public synchronized ProductId storeProduct(Product product) throws Exception {
// prepare statement
beginTransaction();
try (
final PreparedStatement statement = getConnection().prepareStatement(
"INSERT INTO " + this.table
+ " (source, type, code, updatetime, data)"
+ " VALUES (?, ?, ?, ?, ?)")
) {
statement.setQueryTimeout(60);
final ProductId id = product.getId();
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
statement.setString(5,
product != null
? new JsonProduct().getJsonObject(product).toString()
: "");
// execute
statement.executeUpdate();
commitTransaction();
return id;
} catch (SQLException e) {
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
if (e.toString().contains("Duplicate entry")) {
throw new ProductAlreadyInStorageException(e.toString());
}
throw e;
}
}
/**
* Get a ProductSource for product in database.
*
* @return ObjectProductSource or null if product not found.
*/
@Override
public ProductSource getProductSource(ProductId id) throws Exception {
final Product product = getProduct(id);
if (product == null) {
return null;
}
return new ObjectProductSource(product);
}
/**
* Store a ProductSource.
*
* Uses ObjectProductHandler to read Product, then calls storeProduct.
*
* @throws ProductAlreadyInStorageException
* if product already in storage.
*/
@Override
public ProductId storeProductSource(ProductSource input) throws Exception {
final ObjectProductHandler handler = new ObjectProductHandler();
input.streamTo(handler);
return storeProduct(handler.getProduct());
}
/**
* Remove product from storage.
*/
@Override
public synchronized void removeProduct(ProductId id) throws Exception {
// prepare statement
final String sql = "DELETE FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
// execute
statement.executeUpdate();
commitTransaction();
} catch (SQLException e) {
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
throw e;
}
}
@Override
public void notifyListeners(StorageEvent event) {
// listeners not supported
}
@Override
public void addStorageListener(StorageListener listener) {
// listeners not supported
}
@Override
public void removeStorageListener(StorageListener listener) {
// listeners not supported
}
}