JsonNotificationIndex.java
package gov.usgs.earthquake.aws;
import java.io.ByteArrayInputStream;
import java.net.URL;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.logging.Logger;
import java.util.logging.Level;
import javax.json.Json;
import gov.usgs.earthquake.distribution.DefaultNotification;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationIndex;
import gov.usgs.earthquake.distribution.URLNotification;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;
import gov.usgs.util.StringUtils;
/**
* Store Notifications in a database.
*
* Only SQLITE or local development should rely on createSchema.
* Products (data column) have exceeded 64kb, plan accordingly.
*
* Mysql Schema Example:<br>
* <pre>
* CREATE TABLE IF NOT EXISTS indexer_receiver_index
* (id INTEGER PRIMARY KEY AUTO_INCREMENT
* , created VARCHAR(255)
* , expires VARCHAR(255)
* , source VARCHAR(255)
* , type VARCHAR(255)
* , code VARCHAR(255)
* , updatetime BIGINT
* , url TEXT
* , data LONGTEXT
* , KEY source_index (source)
* , KEY type_index (type)
* , KEY code_index (code)
* , KEY expires_index (expires)
* ) ENGINE=innodb CHARSET=utf8;
* </pre>
*/
public class JsonNotificationIndex
extends JDBCConnection
implements NotificationIndex {
private static final Logger LOGGER = Logger.getLogger(
JsonNotificationIndex.class.getName());
/** Variable for the default driver */
public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
/** Variable for the default table */
public static final String DEFAULT_TABLE = "notification";
/** Variable for the default URL */
public static final String DEFAULT_URL =
"jdbc:sqlite:json_notification_index.db";
/** Database table name. */
private String table;
/**
* Construct a JsonNotification using defaults.
*/
public JsonNotificationIndex() {
this(DEFAULT_DRIVER, DEFAULT_URL);
}
/**
* Construct a JsonNotificationIndex with the default table.
* @param driver Driver to use
* @param url URL to use
*/
public JsonNotificationIndex(final String driver, final String url) {
this(driver, url, DEFAULT_TABLE);
}
/**
* Construct a JsonNotificationIndex with custom driver, url, and table.
* @param driver Driver to use
* @param url URL to use
* @param table Table to use
*/
public JsonNotificationIndex(
final String driver, final String url, final String table) {
super(driver, url);
this.table = table;
}
/** @return table */
public String getTable() { return this.table; }
/** @param table Table to set */
public void setTable(final String table) { this.table = table; }
@Override
public void configure(final Config config) throws Exception {
super.configure(config);
if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
if (getUrl() == null) { setUrl(DEFAULT_URL); }
setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}
/**
* After normal startup, check whether schema exists and attempt to create.
* @throws Exception if error occurs
*/
@Override
public void startup() throws Exception {
super.startup();
// make sure schema exists
if (!schemaExists()) {
LOGGER.warning("[" + getName() + "] schema not found, creating");
createSchema();
}
}
/**
* Check whether schema exists.
*
* @return boolean
* @throws Exception if error occurs
*/
public boolean schemaExists() throws Exception {
final String sql = "select * from " + this.table + " limit 1";
beginTransaction();
try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
// should throw exception if table does not exist
test.setQueryTimeout(60);
try (final ResultSet rs = test.executeQuery()) {
rs.next();
}
commitTransaction();
// schema exists
return true;
} catch (Exception e) {
rollbackTransaction();
return false;
}
}
/**
* Attempt to create schema.
*
* Only supports sqlite or mysql. When not using sqlite, relying on this
* method is only recommended for local development.
*
* @throws Exception if error occurs
*/
public void createSchema() throws Exception {
// create schema
beginTransaction();
try (final Statement statement = getConnection().createStatement()) {
String autoIncrement = "";
String engine = "";
if (getDriver().contains("mysql")) {
autoIncrement = " AUTO_INCREMENT";
engine = " ENGINE=innodb CHARSET=utf8";
}
statement.executeUpdate(
"CREATE TABLE " + this.table
+ " (id INTEGER PRIMARY KEY" + autoIncrement
+ ", created VARCHAR(255)"
+ ", expires VARCHAR(255)"
+ ", source VARCHAR(255)"
+ ", type VARCHAR(255)"
+ ", code VARCHAR(255)"
+ ", updatetime BIGINT"
+ ", url TEXT"
+ ", data TEXT"
+ ")" + engine);
statement.executeUpdate(
"CREATE INDEX source_index ON " + this.table + " (source)");
statement.executeUpdate(
"CREATE INDEX type_index ON " + this.table + " (type)");
statement.executeUpdate(
"CREATE INDEX code_index ON " + this.table + " (code)");
statement.executeUpdate(
"CREATE INDEX expires_index ON " + this.table + " (expires)");
commitTransaction();
} catch (Exception e) {
rollbackTransaction();
throw e;
}
}
/**
* Add a notification to the index.
*
* TrackerURLs are ignored.
* @param notification To be added to index
* @throws Exception if error occurs
*/
@Override
public synchronized void addNotification(Notification notification)
throws Exception {
// all notifications
Instant expires = notification.getExpirationDate().toInstant();
ProductId id = notification.getProductId();
// json only
Instant created = null;
Product product = null;
// url only
URL url = null;
if (notification instanceof JsonNotification) {
JsonNotification jsonNotification = (JsonNotification) notification;
created = jsonNotification.created;
product = jsonNotification.product;
} else if (notification instanceof URLNotification) {
url = ((URLNotification) notification).getProductURL();
}
// prepare statement
beginTransaction();
try (
final PreparedStatement statement = getConnection().prepareStatement(
"INSERT INTO " + this.table
+ " (created, expires, source, type, code, updatetime, url, data)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
) {
try {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, created != null ? created.toString() : "");
statement.setString(2, expires.toString());
statement.setString(3, id.getSource());
statement.setString(4, id.getType());
statement.setString(5, id.getCode());
statement.setLong(6, id.getUpdateTime().getTime());
statement.setString(7, url != null ? url.toString() : "");
if (product == null) {
statement.setNull(8, Types.VARCHAR);
} else {
statement.setString(8,
new JsonProduct().getJsonObject(product).toString());
}
// execute
statement.executeUpdate();
commitTransaction();
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception adding notification", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
}
/**
* Remove notification from index.
*
* Tracker URLs are ignored.
* @param notification to be removed from index
* @throws Exception if error occurs
*/
@Override
public synchronized void removeNotification(Notification notification) throws Exception {
final List<Notification> notifications = new ArrayList<>();
notifications.add(notification);
this.removeNotifications(notifications);
}
/**
* Remove notifications from index.
*
* Tracker URLs are ignored.
* @param notifications
* notifications to be removed from index
* @throws Exception if error occurs
*/
@Override
public synchronized void removeNotifications(List<Notification> notifications) throws Exception {
// prepare statement
final String sql = "DELETE FROM " + this.table
+ " WHERE created=? AND expires=? AND source=? AND type=? AND code=?"
+ " AND updatetime=? AND url=?";
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(60);
for (Notification notification : notifications) {
// all notifications
Instant expires = notification.getExpirationDate().toInstant();
ProductId id = notification.getProductId();
// json only
Instant created = null;
// url only
URL url = null;
if (notification instanceof JsonNotification) {
JsonNotification jsonNotification = (JsonNotification) notification;
created = jsonNotification.created;
} else if (notification instanceof URLNotification) {
url = ((URLNotification) notification).getProductURL();
}
// set parameters
statement.setString(1, created != null ? created.toString() : "");
statement.setString(2, expires.toString());
statement.setString(3, id.getSource());
statement.setString(4, id.getType());
statement.setString(5, id.getCode());
statement.setLong(6, id.getUpdateTime().getTime());
statement.setString(7, url != null ? url.toString() : "");
statement.addBatch();
}
// execute
statement.executeBatch();
commitTransaction();
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception removing notification", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
}
/**
* Search index for notifications.
*
* @param source
* source, or null for all sources.
* @param type
* type, or null for all types.
* @param code
* code, or null for all codes.
* @return list with matching notifications, empty if not found.
* @throws Exception if error occurs
*/
@Override
public synchronized List<Notification> findNotifications(
String source, String type, String code) throws Exception {
final ArrayList<Object> where = new ArrayList<Object>();
final ArrayList<String> values = new ArrayList<String>();
if (source != null) {
where.add("source=?");
values.add(source);
}
if (type != null) {
where.add("type=?");
values.add(type);
}
if (code != null) {
where.add("code=?");
values.add(code);
}
String sql = "SELECT * FROM " + this.table;
if (where.size() > 0) {
sql += " WHERE " + StringUtils.join(where, " AND ");
}
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1800);
// set parameters
for (int i = 0, len=values.size(); i < len; i++) {
statement.setString(i+1, values.get(i));
}
// execute
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
return notifications;
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception finding notifications", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
return new ArrayList<Notification>();
}
/**
* Search index for notifications.
*
* @param sources
* sources, or null for all sources.
* @param types
* types, or null for all types.
* @param codes
* codes, or null for all codes.
* @return list with matching notifications, empty if not found.
* @throws Exception if error occurs
*/
@Override
public synchronized List<Notification> findNotifications(
List<String> sources, List<String> types, List<String> codes)
throws Exception {
final ArrayList<Object> where = new ArrayList<Object>();
final ArrayList<String> values = new ArrayList<String>();
if (sources != null && sources.size() > 0) {
where.add("source IN (" +
StringUtils.join(
Collections.nCopies(sources.size(), (Object)"?"),
",")
+ ")");
values.addAll(sources);
}
if (types != null && types.size() > 0) {
where.add("type IN (" +
StringUtils.join(
Collections.nCopies(types.size(), (Object)"?"),
",")
+ ")");
values.addAll(types);
}
if (codes != null && codes.size() > 0) {
where.add("code IN (" +
StringUtils.join(
Collections.nCopies(codes.size(), (Object)"?"),
",")
+ ")");
values.addAll(codes);
}
String sql = "SELECT * FROM " + this.table;
if (where.size() > 0) {
sql += " WHERE " + StringUtils.join(where, " AND ");
} else {
// searching for all notifications
// this is typically done to requeue a notification index.
// run query in a way that returns list of default notifications,
// (by returning empty created, data, and url)
// since full details are not needed during requeue
sql = "SELECT DISTINCT"
+ " '' as created, expires, source, type, code, updateTime"
+ ", '' as url, null as data"
+ " FROM " + this.table
+ " WHERE expires > ?";
values.add(Instant.now().toString());
}
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1800);
// set parameters
for (int i = 0, len=values.size(); i < len; i++) {
statement.setString(i+1, values.get(i));
}
// execute
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
return notifications;
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception finding notifications", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
return new ArrayList<Notification>();
}
/**
* Find notifications with expires time before or equal to current time.
*
* @return list with matching notifications, empty if not found.
* @throws Exception if error occurs
*/
@Override
public synchronized List<Notification> findExpiredNotifications() throws Exception {
final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 5000";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1800);
// set parameters
statement.setString(1, Instant.now().toString());
// execute
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
return notifications;
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception finding notifications", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
return new ArrayList<Notification>();
}
/**
* Search index for notifications for a specific product.
*
* @param id
* the product id to search.
* @return list with matching notifications, empty if not found.
* @throws Exception if error occurs
*/
@Override
public synchronized List<Notification> findNotifications(ProductId id) throws Exception {
final String sql = "SELECT * FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(30);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
// executes and commit ifsuccessful
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
return notifications;
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception finding notifications", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
return new ArrayList<Notification>();
}
/**
* This method is used to find notifications present in this index
* but not present in another JsonNotificationIndex table in the same
* database.
*
* This is used to optimize the queuing process at startup and returns
* DefaultNotifications. The receiver process will look up the actual
* notification object during processing.
*
* @param otherTable
* name of table in same database.
* @return
* list of notifications found in this indexes table, but not found in the
* other table.
* @throws Exception if error occurs
*/
public synchronized List<Notification> getMissingNotifications(
final String otherTable) throws Exception {
// this is used to requeue a notification index.
// run query in a way that returns list of default notifications,
// (by returning empty created, data, and url)
// since full details are not needed during requeue
final String sql = "SELECT DISTINCT"
+ " '' as created, t.expires, t.source, t.type, t.code, t.updateTime"
+ ", '' as url, null as data"
+ " FROM " + this.table + " t"
// only missing if not expired
+ " WHERE t.expires > ?"
+ " AND NOT EXISTS ("
+ "SELECT * FROM " + otherTable
+ " WHERE source=t.source AND type=t.type"
+ " AND code=t.code AND updatetime=t.updateTime"
+ ")";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1800);
// set parameters
statement.setString(1, Instant.now().toString());
// execute and commit if successful
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
return notifications;
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception finding notifications", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
return new ArrayList<Notification>();
}
/**
* Parse notifications from a statement ready to be executed.
* @param ps PreparedStatement to be parsed
* @return List of notifications
* @throws Exception if error occurs
*/
protected synchronized List<Notification> getNotifications(PreparedStatement ps)
throws Exception {
final List<Notification> n = new ArrayList<Notification>();
try (final ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
n.add(parseNotification(
rs.getString("created"),
rs.getString("expires"),
rs.getString("source"),
rs.getString("type"),
rs.getString("code"),
rs.getLong("updatetime"),
rs.getString("url"),
rs.getString("data")));
}
}
return n;
}
/**
* Creates and returns a <code>Notification</code> based on the provided data.
*
* <ul>
* <li>Return a JSONNotification if <code>created</code> and <code>data</code>
* are set
* <li>Return a URLNotification if <code>url</code> is set
* <li>Otherwise, return a DefaultNotification
* </ul>
* @param created When created
* @param expires When notification expires
* @param source sources
* @param type types
* @param code codes
* @param updateTime updateTime
* @param url URL
* @param data data
* @return Notification, JSONNotification, URLNotification, or DefaultNotification
* @throws Exception if error occurs
*/
protected Notification parseNotification(
final String created,
final String expires,
final String source,
final String type,
final String code,
final Long updateTime,
final String url,
final String data) throws Exception {
final Notification n;
final ProductId id = new ProductId(source, type, code, new Date(updateTime));
final Date expiresDate = Date.from(Instant.parse(expires));
if (!"".equals(created) && data != null) {
Product product = new JsonProduct().getProduct(
Json.createReader(
new ByteArrayInputStream(data.getBytes())
).readObject());
n = new JsonNotification(Instant.parse(created), product);
} else if (!"".equals(url)) {
n = new URLNotification(id, expiresDate, null, new URL(url));
} else {
n = new DefaultNotification(id, expiresDate, null);
}
return n;
}
}