JsonNotificationIndex.java

package gov.usgs.earthquake.aws;

import java.io.ByteArrayInputStream;
import java.net.URL;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.logging.Logger;
import java.util.logging.Level;

import javax.json.Json;


import gov.usgs.earthquake.distribution.DefaultNotification;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationIndex;
import gov.usgs.earthquake.distribution.URLNotification;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;
import gov.usgs.util.StringUtils;

/**
 * Store Notifications in a database.
 *
 * Only SQLITE or local development should rely on createSchema.
 * Products (data column) have exceeded 64kb, plan accordingly.
 *
 * Mysql Schema Example:<br>
 * <pre>
 * CREATE TABLE IF NOT EXISTS indexer_receiver_index
 * (id INTEGER PRIMARY KEY AUTO_INCREMENT
 * , created VARCHAR(255)
 * , expires VARCHAR(255)
 * , source VARCHAR(255)
 * , type VARCHAR(255)
 * , code VARCHAR(255)
 * , updatetime BIGINT
 * , url TEXT
 * , data LONGTEXT
 * , KEY source_index (source)
 * , KEY type_index (type)
 * , KEY code_index (code)
 * , KEY expires_index (expires)
 * ) ENGINE=innodb CHARSET=utf8;
 * </pre>
 */
public class JsonNotificationIndex
    extends JDBCConnection
    implements NotificationIndex {

  private static final Logger LOGGER = Logger.getLogger(
      JsonNotificationIndex.class.getName());

  /** Variable for the default driver */
  public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
  /** Variable for the default table */
  public static final String DEFAULT_TABLE = "notification";
  /** Variable for the default URL */
  public static final String DEFAULT_URL =
      "jdbc:sqlite:json_notification_index.db";

  /** Database table name. */
  private String table;

  /**
   * Construct a JsonNotification using defaults.
   */
  public JsonNotificationIndex() {
    this(DEFAULT_DRIVER, DEFAULT_URL);
  }

  /**
   * Construct a JsonNotificationIndex with the default table.
   * @param driver Driver to use
   * @param url URL to use
   */
  public JsonNotificationIndex(final String driver, final String url) {
    this(driver, url, DEFAULT_TABLE);
  }

  /**
   * Construct a JsonNotificationIndex with custom driver, url, and table.
   * @param driver Driver to use
   * @param url URL to use
   * @param table Table to use
   */
  public JsonNotificationIndex(
      final String driver, final String url, final String table) {
    super(driver, url);
    this.table = table;
  }

  /** @return table */
  public String getTable() { return this.table; }
  /** @param table Table to set */
  public void setTable(final String table) { this.table = table; }

  @Override
  public void configure(final Config config) throws Exception {
    super.configure(config);
    if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
    if (getUrl() == null) { setUrl(DEFAULT_URL); }

    setTable(config.getProperty("table", DEFAULT_TABLE));
    LOGGER.config("[" + getName() + "] driver=" + getDriver());
    LOGGER.config("[" + getName() + "] table=" + getTable());
    // do not log url, it may contain user/pass
  }

  /**
   * After normal startup, check whether schema exists and attempt to create.
   * @throws Exception if error occurs
   */
  @Override
  public void startup() throws Exception {
    super.startup();
    // make sure schema exists
    if (!schemaExists()) {
      LOGGER.warning("[" + getName() + "] schema not found, creating");
      createSchema();
    }
  }

  /**
   * Check whether schema exists.
   *
   * @return boolean
   * @throws Exception if error occurs
   */
  public boolean schemaExists() throws Exception {
    final String sql = "select * from " + this.table + " limit 1";
    beginTransaction();
    try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
      // should throw exception if table does not exist
      test.setQueryTimeout(60);
      try (final ResultSet rs = test.executeQuery()) {
        rs.next();
      }
      commitTransaction();
      // schema exists
      return true;
    } catch (Exception e) {
      rollbackTransaction();
      return false;
    }
  }

  /**
   * Attempt to create schema.
   *
   * Only supports sqlite or mysql.  When not using sqlite, relying on this
   * method is only recommended for local development.
   *
   * @throws Exception if error occurs
   */
  public void createSchema() throws Exception {
    // create schema
    beginTransaction();
    try (final Statement statement = getConnection().createStatement()) {
      String autoIncrement = "";
      String engine = "";
      if (getDriver().contains("mysql")) {
        autoIncrement = " AUTO_INCREMENT";
        engine = " ENGINE=innodb CHARSET=utf8";
      }
      statement.executeUpdate(
          "CREATE TABLE " + this.table
          + " (id INTEGER PRIMARY KEY" + autoIncrement
          + ", created VARCHAR(255)"
          + ", expires VARCHAR(255)"
          + ", source VARCHAR(255)"
          + ", type VARCHAR(255)"
          + ", code VARCHAR(255)"
          + ", updatetime BIGINT"
          + ", url TEXT"
          + ", data TEXT"
          + ")" + engine);
      statement.executeUpdate(
          "CREATE INDEX source_index ON " + this.table + " (source)");
      statement.executeUpdate(
          "CREATE INDEX type_index ON " + this.table + " (type)");
      statement.executeUpdate(
          "CREATE INDEX code_index ON " + this.table + " (code)");
      statement.executeUpdate(
          "CREATE INDEX expires_index ON " + this.table + " (expires)");
      commitTransaction();
    } catch (Exception e) {
      rollbackTransaction();
      throw e;
    }
  }

  /**
   * Add a notification to the index.
   *
   * TrackerURLs are ignored.
   * @param notification To be added to index
   * @throws Exception if error occurs
   */
  @Override
  public synchronized void addNotification(Notification notification)
      throws Exception {
    // all notifications
    Instant expires = notification.getExpirationDate().toInstant();
    ProductId id = notification.getProductId();
    // json only
    Instant created = null;
    Product product = null;
    // url only
    URL url = null;
    if (notification instanceof JsonNotification) {
      JsonNotification jsonNotification = (JsonNotification) notification;
      created = jsonNotification.created;
      product = jsonNotification.product;
    } else if (notification instanceof URLNotification) {
      url = ((URLNotification) notification).getProductURL();
    }
    // prepare statement
    beginTransaction();
    try (
      final PreparedStatement statement = getConnection().prepareStatement(
          "INSERT INTO " + this.table
          + " (created, expires, source, type, code, updatetime, url, data)"
          + " VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
    ) {
      try {
        statement.setQueryTimeout(60);
        // set parameters
        statement.setString(1, created != null ? created.toString() : "");
        statement.setString(2, expires.toString());
        statement.setString(3, id.getSource());
        statement.setString(4, id.getType());
        statement.setString(5, id.getCode());
        statement.setLong(6, id.getUpdateTime().getTime());
        statement.setString(7, url != null ? url.toString() : "");
        if (product == null) {
          statement.setNull(8, Types.VARCHAR);
        } else {
          statement.setString(8,
              new JsonProduct().getJsonObject(product).toString());
        }
        // execute
        statement.executeUpdate();
        commitTransaction();
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception adding notification", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
  }

  /**
   * Remove notification from index.
   *
   * Tracker URLs are ignored.
   * @param notification to be removed from index
   * @throws Exception if error occurs
   */
  @Override
  public synchronized void removeNotification(Notification notification) throws Exception {
    final List<Notification> notifications = new ArrayList<>();
    notifications.add(notification);
    this.removeNotifications(notifications);
  }

  /**
   * Remove notifications from index.
   *
   * Tracker URLs are ignored.
   * @param notifications
   *     notifications to be removed from index
   * @throws Exception if error occurs
   */
  @Override
  public synchronized void removeNotifications(List<Notification> notifications) throws Exception {
    // prepare statement
    final String sql = "DELETE FROM " + this.table
          + " WHERE created=? AND expires=? AND source=? AND type=? AND code=?"
          + " AND updatetime=? AND url=?";
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      try {
        statement.setQueryTimeout(60);

        for (Notification notification : notifications) {
          // all notifications
          Instant expires = notification.getExpirationDate().toInstant();
          ProductId id = notification.getProductId();
          // json only
          Instant created = null;
          // url only
          URL url = null;
          if (notification instanceof JsonNotification) {
            JsonNotification jsonNotification = (JsonNotification) notification;
            created = jsonNotification.created;
          } else if (notification instanceof URLNotification) {
            url = ((URLNotification) notification).getProductURL();
          }

          // set parameters
          statement.setString(1, created != null ? created.toString() : "");
          statement.setString(2, expires.toString());
          statement.setString(3, id.getSource());
          statement.setString(4, id.getType());
          statement.setString(5, id.getCode());
          statement.setLong(6, id.getUpdateTime().getTime());
          statement.setString(7, url != null ? url.toString() : "");
          statement.addBatch();
        }
        // execute
        statement.executeBatch();
        commitTransaction();
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception removing notification", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
  }

  /**
   * Search index for notifications.
   *
   * @param source
   *     source, or null for all sources.
   * @param type
   *     type, or null for all types.
   * @param code
   *     code, or null for all codes.
   * @return list with matching notifications, empty if not found.
   * @throws Exception if error occurs
   */
  @Override
  public synchronized List<Notification> findNotifications(
      String source, String type, String code) throws Exception {
    final ArrayList<Object> where = new ArrayList<Object>();
    final ArrayList<String> values = new ArrayList<String>();
    if (source != null) {
      where.add("source=?");
      values.add(source);
    }
    if (type != null) {
      where.add("type=?");
      values.add(type);
    }
    if (code != null) {
      where.add("code=?");
      values.add(code);
    }
    String sql = "SELECT * FROM " + this.table;
    if (where.size() > 0) {
      sql += " WHERE " + StringUtils.join(where, " AND ");
    }
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      try {
        statement.setQueryTimeout(1800);

        // set parameters
        for (int i = 0, len=values.size(); i < len; i++) {
          statement.setString(i+1, values.get(i));
        }

        // execute
        final List<Notification> notifications = getNotifications(statement);
        commitTransaction();
        return notifications;
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception finding notifications", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
    return new ArrayList<Notification>();
  }

  /**
   * Search index for notifications.
   *
   * @param sources
   *     sources, or null for all sources.
   * @param types
   *     types, or null for all types.
   * @param codes
   *     codes, or null for all codes.
   * @return list with matching notifications, empty if not found.
   * @throws Exception if error occurs
   */
  @Override
  public synchronized List<Notification> findNotifications(
      List<String> sources, List<String> types, List<String> codes)
      throws Exception {
    final ArrayList<Object> where = new ArrayList<Object>();
    final ArrayList<String> values = new ArrayList<String>();
    if (sources != null && sources.size() > 0) {
      where.add("source IN (" +
          StringUtils.join(
              Collections.nCopies(sources.size(), (Object)"?"),
              ",")
          + ")");
      values.addAll(sources);
    }
    if (types != null && types.size() > 0) {
      where.add("type IN (" +
          StringUtils.join(
              Collections.nCopies(types.size(), (Object)"?"),
              ",")
          + ")");
      values.addAll(types);
    }
    if (codes != null && codes.size() > 0) {
      where.add("code IN (" +
          StringUtils.join(
              Collections.nCopies(codes.size(), (Object)"?"),
              ",")
          + ")");
      values.addAll(codes);
    }
    String sql = "SELECT * FROM " + this.table;
    if (where.size() > 0) {
      sql += " WHERE " + StringUtils.join(where, " AND ");
    } else {
      // searching for all notifications

      // this is typically done to requeue a notification index.
      // run query in a way that returns list of default notifications,
      // (by returning empty created, data, and url)
      // since full details are not needed during requeue
      sql = "SELECT DISTINCT"
          + " '' as created, expires, source, type, code, updateTime"
          + ", '' as url, null as data"
          + " FROM " + this.table
          + " WHERE expires > ?";
      values.add(Instant.now().toString());
    }
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      try {
        statement.setQueryTimeout(1800);

        // set parameters
        for (int i = 0, len=values.size(); i < len; i++) {
          statement.setString(i+1, values.get(i));
        }

        // execute
        final List<Notification> notifications = getNotifications(statement);
        commitTransaction();
        return notifications;
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception finding notifications", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
    return new ArrayList<Notification>();
  }

  /**
   * Find notifications with expires time before or equal to current time.
   *
   * @return list with matching notifications, empty if not found.
   * @throws Exception if error occurs
   */
  @Override
  public synchronized List<Notification> findExpiredNotifications() throws Exception {
    final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 5000";
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      try {
        statement.setQueryTimeout(1800);

        // set parameters
        statement.setString(1, Instant.now().toString());

        // execute
        final List<Notification> notifications = getNotifications(statement);
        commitTransaction();
        return notifications;
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception finding notifications", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
    return new ArrayList<Notification>();

  }

  /**
   * Search index for notifications for a specific product.
   *
   * @param id
   *     the product id to search.
   * @return list with matching notifications, empty if not found.
   * @throws Exception if error occurs
   */
  @Override
  public synchronized List<Notification> findNotifications(ProductId id) throws Exception {
    final String sql = "SELECT * FROM " + this.table
        + " WHERE source=? AND type=? AND code=? AND updatetime=?";
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      try {
        statement.setQueryTimeout(30);
        // set parameters
        statement.setString(1, id.getSource());
        statement.setString(2, id.getType());
        statement.setString(3, id.getCode());
        statement.setLong(4, id.getUpdateTime().getTime());

        // executes and commit ifsuccessful
        final List<Notification> notifications = getNotifications(statement);
        commitTransaction();
        return notifications;
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception finding notifications", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
    return new ArrayList<Notification>();
  }

  /**
   * This method is used to find notifications present in this index
   * but not present in another JsonNotificationIndex table in the same
   * database.
   *
   * This is used to optimize the queuing process at startup and returns
   * DefaultNotifications.  The receiver process will look up the actual
   * notification object during processing.
   *
   * @param otherTable
   *     name of table in same database.
   * @return
   *     list of notifications found in this indexes table, but not found in the
   *     other table.
   * @throws Exception if error occurs
   */
  public synchronized List<Notification> getMissingNotifications(
      final String otherTable) throws Exception {
    // this is used to requeue a notification index.
    // run query in a way that returns list of default notifications,
    // (by returning empty created, data, and url)
    // since full details are not needed during requeue
    final String sql = "SELECT DISTINCT"
        + " '' as created, t.expires, t.source, t.type, t.code, t.updateTime"
        + ", '' as url, null as data"
        + " FROM " + this.table + " t"
        // only missing if not expired
        + " WHERE t.expires > ?"
        + " AND NOT EXISTS ("
          + "SELECT * FROM " + otherTable
            + " WHERE source=t.source AND type=t.type"
            + " AND code=t.code AND updatetime=t.updateTime"
        + ")";
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      try {
        statement.setQueryTimeout(1800);

        // set parameters
        statement.setString(1, Instant.now().toString());

        // execute and commit if successful
        final List<Notification> notifications = getNotifications(statement);
        commitTransaction();
        return notifications;
      } catch (SQLException e) {
        LOGGER.log(Level.WARNING, "Exception finding notifications", e);
        try {
          // otherwise roll back
          rollbackTransaction();
        } catch (SQLException e2) {
          // ignore
        }
      }
    }
    return new ArrayList<Notification>();
  }

  /**
   * Parse notifications from a statement ready to be executed.
   * @param ps PreparedStatement to be parsed
   * @return List of notifications
   * @throws Exception if error occurs
   */
  protected synchronized List<Notification> getNotifications(PreparedStatement ps)
      throws Exception {
    final List<Notification> n = new ArrayList<Notification>();
    try (final ResultSet rs = ps.executeQuery()) {
      while (rs.next()) {
        n.add(parseNotification(
            rs.getString("created"),
            rs.getString("expires"),
            rs.getString("source"),
            rs.getString("type"),
            rs.getString("code"),
            rs.getLong("updatetime"),
            rs.getString("url"),
            rs.getString("data")));
      }
    }
    return n;
  }

  /**
   * Creates and returns a <code>Notification</code> based on the provided data.
   *
   * <ul>
   * <li>Return a JSONNotification if <code>created</code> and <code>data</code>
   * are set
   * <li>Return a URLNotification if <code>url</code> is set
   * <li>Otherwise, return a DefaultNotification
   * </ul>
   * @param created When created
   * @param expires When notification expires
   * @param source sources
   * @param type types
   * @param code codes
   * @param updateTime updateTime
   * @param url URL
   * @param data data
   * @return Notification, JSONNotification, URLNotification, or DefaultNotification
   * @throws Exception if error occurs
   */
  protected Notification parseNotification(
      final String created,
      final String expires,
      final String source,
      final String type,
      final String code,
      final Long updateTime,
      final String url,
      final String data) throws Exception {
    final Notification n;
    final ProductId id = new ProductId(source, type, code, new Date(updateTime));
    final Date expiresDate = Date.from(Instant.parse(expires));
    if (!"".equals(created) && data != null) {
      Product product = new JsonProduct().getProduct(
          Json.createReader(
            new ByteArrayInputStream(data.getBytes())
          ).readObject());
      n = new JsonNotification(Instant.parse(created), product);
    } else if (!"".equals(url)) {
      n = new URLNotification(id, expiresDate, null, new URL(url));
    } else {
      n = new DefaultNotification(id, expiresDate, null);
    }
    return n;
  }

}