JsonNotificationIndex.java

  1. package gov.usgs.earthquake.aws;

  2. import java.io.ByteArrayInputStream;
  3. import java.net.URL;
  4. import java.sql.PreparedStatement;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import java.sql.Statement;
  8. import java.sql.Types;
  9. import java.time.Instant;
  10. import java.util.ArrayList;
  11. import java.util.Collections;
  12. import java.util.Date;
  13. import java.util.List;
  14. import java.util.logging.Logger;
  15. import java.util.logging.Level;

  16. import javax.json.Json;


  17. import gov.usgs.earthquake.distribution.DefaultNotification;
  18. import gov.usgs.earthquake.distribution.Notification;
  19. import gov.usgs.earthquake.distribution.NotificationIndex;
  20. import gov.usgs.earthquake.distribution.URLNotification;
  21. import gov.usgs.earthquake.product.Product;
  22. import gov.usgs.earthquake.product.ProductId;
  23. import gov.usgs.earthquake.product.io.JsonProduct;
  24. import gov.usgs.earthquake.util.JDBCConnection;
  25. import gov.usgs.util.Config;
  26. import gov.usgs.util.StringUtils;

  27. /**
  28.  * Store Notifications in a database.
  29.  *
  30.  * Only SQLITE or local development should rely on createSchema.
  31.  * Products (data column) have exceeded 64kb, plan accordingly.
  32.  *
  33.  * Mysql Schema Example:<br>
  34.  * <pre>
  35.  * CREATE TABLE IF NOT EXISTS indexer_receiver_index
  36.  * (id INTEGER PRIMARY KEY AUTO_INCREMENT
  37.  * , created VARCHAR(255)
  38.  * , expires VARCHAR(255)
  39.  * , source VARCHAR(255)
  40.  * , type VARCHAR(255)
  41.  * , code VARCHAR(255)
  42.  * , updatetime BIGINT
  43.  * , url TEXT
  44.  * , data LONGTEXT
  45.  * , KEY source_index (source)
  46.  * , KEY type_index (type)
  47.  * , KEY code_index (code)
  48.  * , KEY expires_index (expires)
  49.  * ) ENGINE=innodb CHARSET=utf8;
  50.  * </pre>
  51.  */
  52. public class JsonNotificationIndex
  53.     extends JDBCConnection
  54.     implements NotificationIndex {

  55.   private static final Logger LOGGER = Logger.getLogger(
  56.       JsonNotificationIndex.class.getName());

  57.   /** Variable for the default driver */
  58.   public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
  59.   /** Variable for the default table */
  60.   public static final String DEFAULT_TABLE = "notification";
  61.   /** Variable for the default URL */
  62.   public static final String DEFAULT_URL =
  63.       "jdbc:sqlite:json_notification_index.db";

  64.   /** Database table name. */
  65.   private String table;

  66.   /**
  67.    * Construct a JsonNotification using defaults.
  68.    */
  69.   public JsonNotificationIndex() {
  70.     this(DEFAULT_DRIVER, DEFAULT_URL);
  71.   }

  72.   /**
  73.    * Construct a JsonNotificationIndex with the default table.
  74.    * @param driver Driver to use
  75.    * @param url URL to use
  76.    */
  77.   public JsonNotificationIndex(final String driver, final String url) {
  78.     this(driver, url, DEFAULT_TABLE);
  79.   }

  80.   /**
  81.    * Construct a JsonNotificationIndex with custom driver, url, and table.
  82.    * @param driver Driver to use
  83.    * @param url URL to use
  84.    * @param table Table to use
  85.    */
  86.   public JsonNotificationIndex(
  87.       final String driver, final String url, final String table) {
  88.     super(driver, url);
  89.     this.table = table;
  90.   }

  91.   /** @return table */
  92.   public String getTable() { return this.table; }
  93.   /** @param table Table to set */
  94.   public void setTable(final String table) { this.table = table; }

  95.   @Override
  96.   public void configure(final Config config) throws Exception {
  97.     super.configure(config);
  98.     if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
  99.     if (getUrl() == null) { setUrl(DEFAULT_URL); }

  100.     setTable(config.getProperty("table", DEFAULT_TABLE));
  101.     LOGGER.config("[" + getName() + "] driver=" + getDriver());
  102.     LOGGER.config("[" + getName() + "] table=" + getTable());
  103.     // do not log url, it may contain user/pass
  104.   }

  105.   /**
  106.    * After normal startup, check whether schema exists and attempt to create.
  107.    * @throws Exception if error occurs
  108.    */
  109.   @Override
  110.   public void startup() throws Exception {
  111.     super.startup();
  112.     // make sure schema exists
  113.     if (!schemaExists()) {
  114.       LOGGER.warning("[" + getName() + "] schema not found, creating");
  115.       createSchema();
  116.     }
  117.   }

  118.   /**
  119.    * Check whether schema exists.
  120.    *
  121.    * @return boolean
  122.    * @throws Exception if error occurs
  123.    */
  124.   public boolean schemaExists() throws Exception {
  125.     final String sql = "select * from " + this.table + " limit 1";
  126.     beginTransaction();
  127.     try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
  128.       // should throw exception if table does not exist
  129.       test.setQueryTimeout(60);
  130.       try (final ResultSet rs = test.executeQuery()) {
  131.         rs.next();
  132.       }
  133.       commitTransaction();
  134.       // schema exists
  135.       return true;
  136.     } catch (Exception e) {
  137.       rollbackTransaction();
  138.       return false;
  139.     }
  140.   }

  141.   /**
  142.    * Attempt to create schema.
  143.    *
  144.    * Only supports sqlite or mysql.  When not using sqlite, relying on this
  145.    * method is only recommended for local development.
  146.    *
  147.    * @throws Exception if error occurs
  148.    */
  149.   public void createSchema() throws Exception {
  150.     // create schema
  151.     beginTransaction();
  152.     try (final Statement statement = getConnection().createStatement()) {
  153.       String autoIncrement = "";
  154.       String engine = "";
  155.       if (getDriver().contains("mysql")) {
  156.         autoIncrement = " AUTO_INCREMENT";
  157.         engine = " ENGINE=innodb CHARSET=utf8";
  158.       }
  159.       statement.executeUpdate(
  160.           "CREATE TABLE " + this.table
  161.           + " (id INTEGER PRIMARY KEY" + autoIncrement
  162.           + ", created VARCHAR(255)"
  163.           + ", expires VARCHAR(255)"
  164.           + ", source VARCHAR(255)"
  165.           + ", type VARCHAR(255)"
  166.           + ", code VARCHAR(255)"
  167.           + ", updatetime BIGINT"
  168.           + ", url TEXT"
  169.           + ", data TEXT"
  170.           + ")" + engine);
  171.       statement.executeUpdate(
  172.           "CREATE INDEX source_index ON " + this.table + " (source)");
  173.       statement.executeUpdate(
  174.           "CREATE INDEX type_index ON " + this.table + " (type)");
  175.       statement.executeUpdate(
  176.           "CREATE INDEX code_index ON " + this.table + " (code)");
  177.       statement.executeUpdate(
  178.           "CREATE INDEX expires_index ON " + this.table + " (expires)");
  179.       commitTransaction();
  180.     } catch (Exception e) {
  181.       rollbackTransaction();
  182.       throw e;
  183.     }
  184.   }

  185.   /**
  186.    * Add a notification to the index.
  187.    *
  188.    * TrackerURLs are ignored.
  189.    * @param notification To be added to index
  190.    * @throws Exception if error occurs
  191.    */
  192.   @Override
  193.   public synchronized void addNotification(Notification notification)
  194.       throws Exception {
  195.     // all notifications
  196.     Instant expires = notification.getExpirationDate().toInstant();
  197.     ProductId id = notification.getProductId();
  198.     // json only
  199.     Instant created = null;
  200.     Product product = null;
  201.     // url only
  202.     URL url = null;
  203.     if (notification instanceof JsonNotification) {
  204.       JsonNotification jsonNotification = (JsonNotification) notification;
  205.       created = jsonNotification.created;
  206.       product = jsonNotification.product;
  207.     } else if (notification instanceof URLNotification) {
  208.       url = ((URLNotification) notification).getProductURL();
  209.     }
  210.     // prepare statement
  211.     beginTransaction();
  212.     try (
  213.       final PreparedStatement statement = getConnection().prepareStatement(
  214.           "INSERT INTO " + this.table
  215.           + " (created, expires, source, type, code, updatetime, url, data)"
  216.           + " VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
  217.     ) {
  218.       try {
  219.         statement.setQueryTimeout(60);
  220.         // set parameters
  221.         statement.setString(1, created != null ? created.toString() : "");
  222.         statement.setString(2, expires.toString());
  223.         statement.setString(3, id.getSource());
  224.         statement.setString(4, id.getType());
  225.         statement.setString(5, id.getCode());
  226.         statement.setLong(6, id.getUpdateTime().getTime());
  227.         statement.setString(7, url != null ? url.toString() : "");
  228.         if (product == null) {
  229.           statement.setNull(8, Types.VARCHAR);
  230.         } else {
  231.           statement.setString(8,
  232.               new JsonProduct().getJsonObject(product).toString());
  233.         }
  234.         // execute
  235.         statement.executeUpdate();
  236.         commitTransaction();
  237.       } catch (SQLException e) {
  238.         LOGGER.log(Level.WARNING, "Exception adding notification", e);
  239.         try {
  240.           // otherwise roll back
  241.           rollbackTransaction();
  242.         } catch (SQLException e2) {
  243.           // ignore
  244.         }
  245.       }
  246.     }
  247.   }

  248.   /**
  249.    * Remove notification from index.
  250.    *
  251.    * Tracker URLs are ignored.
  252.    * @param notification to be removed from index
  253.    * @throws Exception if error occurs
  254.    */
  255.   @Override
  256.   public synchronized void removeNotification(Notification notification) throws Exception {
  257.     final List<Notification> notifications = new ArrayList<>();
  258.     notifications.add(notification);
  259.     this.removeNotifications(notifications);
  260.   }

  261.   /**
  262.    * Remove notifications from index.
  263.    *
  264.    * Tracker URLs are ignored.
  265.    * @param notifications
  266.    *     notifications to be removed from index
  267.    * @throws Exception if error occurs
  268.    */
  269.   @Override
  270.   public synchronized void removeNotifications(List<Notification> notifications) throws Exception {
  271.     // prepare statement
  272.     final String sql = "DELETE FROM " + this.table
  273.           + " WHERE created=? AND expires=? AND source=? AND type=? AND code=?"
  274.           + " AND updatetime=? AND url=?";
  275.     beginTransaction();
  276.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  277.       try {
  278.         statement.setQueryTimeout(60);

  279.         for (Notification notification : notifications) {
  280.           // all notifications
  281.           Instant expires = notification.getExpirationDate().toInstant();
  282.           ProductId id = notification.getProductId();
  283.           // json only
  284.           Instant created = null;
  285.           // url only
  286.           URL url = null;
  287.           if (notification instanceof JsonNotification) {
  288.             JsonNotification jsonNotification = (JsonNotification) notification;
  289.             created = jsonNotification.created;
  290.           } else if (notification instanceof URLNotification) {
  291.             url = ((URLNotification) notification).getProductURL();
  292.           }

  293.           // set parameters
  294.           statement.setString(1, created != null ? created.toString() : "");
  295.           statement.setString(2, expires.toString());
  296.           statement.setString(3, id.getSource());
  297.           statement.setString(4, id.getType());
  298.           statement.setString(5, id.getCode());
  299.           statement.setLong(6, id.getUpdateTime().getTime());
  300.           statement.setString(7, url != null ? url.toString() : "");
  301.           statement.addBatch();
  302.         }
  303.         // execute
  304.         statement.executeBatch();
  305.         commitTransaction();
  306.       } catch (SQLException e) {
  307.         LOGGER.log(Level.WARNING, "Exception removing notification", e);
  308.         try {
  309.           // otherwise roll back
  310.           rollbackTransaction();
  311.         } catch (SQLException e2) {
  312.           // ignore
  313.         }
  314.       }
  315.     }
  316.   }

  317.   /**
  318.    * Search index for notifications.
  319.    *
  320.    * @param source
  321.    *     source, or null for all sources.
  322.    * @param type
  323.    *     type, or null for all types.
  324.    * @param code
  325.    *     code, or null for all codes.
  326.    * @return list with matching notifications, empty if not found.
  327.    * @throws Exception if error occurs
  328.    */
  329.   @Override
  330.   public synchronized List<Notification> findNotifications(
  331.       String source, String type, String code) throws Exception {
  332.     final ArrayList<Object> where = new ArrayList<Object>();
  333.     final ArrayList<String> values = new ArrayList<String>();
  334.     if (source != null) {
  335.       where.add("source=?");
  336.       values.add(source);
  337.     }
  338.     if (type != null) {
  339.       where.add("type=?");
  340.       values.add(type);
  341.     }
  342.     if (code != null) {
  343.       where.add("code=?");
  344.       values.add(code);
  345.     }
  346.     String sql = "SELECT * FROM " + this.table;
  347.     if (where.size() > 0) {
  348.       sql += " WHERE " + StringUtils.join(where, " AND ");
  349.     }
  350.     // prepare statement
  351.     beginTransaction();
  352.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  353.       try {
  354.         statement.setQueryTimeout(1800);

  355.         // set parameters
  356.         for (int i = 0, len=values.size(); i < len; i++) {
  357.           statement.setString(i+1, values.get(i));
  358.         }

  359.         // execute
  360.         final List<Notification> notifications = getNotifications(statement);
  361.         commitTransaction();
  362.         return notifications;
  363.       } catch (SQLException e) {
  364.         LOGGER.log(Level.WARNING, "Exception finding notifications", e);
  365.         try {
  366.           // otherwise roll back
  367.           rollbackTransaction();
  368.         } catch (SQLException e2) {
  369.           // ignore
  370.         }
  371.       }
  372.     }
  373.     return new ArrayList<Notification>();
  374.   }

  375.   /**
  376.    * Search index for notifications.
  377.    *
  378.    * @param sources
  379.    *     sources, or null for all sources.
  380.    * @param types
  381.    *     types, or null for all types.
  382.    * @param codes
  383.    *     codes, or null for all codes.
  384.    * @return list with matching notifications, empty if not found.
  385.    * @throws Exception if error occurs
  386.    */
  387.   @Override
  388.   public synchronized List<Notification> findNotifications(
  389.       List<String> sources, List<String> types, List<String> codes)
  390.       throws Exception {
  391.     final ArrayList<Object> where = new ArrayList<Object>();
  392.     final ArrayList<String> values = new ArrayList<String>();
  393.     if (sources != null && sources.size() > 0) {
  394.       where.add("source IN (" +
  395.           StringUtils.join(
  396.               Collections.nCopies(sources.size(), (Object)"?"),
  397.               ",")
  398.           + ")");
  399.       values.addAll(sources);
  400.     }
  401.     if (types != null && types.size() > 0) {
  402.       where.add("type IN (" +
  403.           StringUtils.join(
  404.               Collections.nCopies(types.size(), (Object)"?"),
  405.               ",")
  406.           + ")");
  407.       values.addAll(types);
  408.     }
  409.     if (codes != null && codes.size() > 0) {
  410.       where.add("code IN (" +
  411.           StringUtils.join(
  412.               Collections.nCopies(codes.size(), (Object)"?"),
  413.               ",")
  414.           + ")");
  415.       values.addAll(codes);
  416.     }
  417.     String sql = "SELECT * FROM " + this.table;
  418.     if (where.size() > 0) {
  419.       sql += " WHERE " + StringUtils.join(where, " AND ");
  420.     } else {
  421.       // searching for all notifications

  422.       // this is typically done to requeue a notification index.
  423.       // run query in a way that returns list of default notifications,
  424.       // (by returning empty created, data, and url)
  425.       // since full details are not needed during requeue
  426.       sql = "SELECT DISTINCT"
  427.           + " '' as created, expires, source, type, code, updateTime"
  428.           + ", '' as url, null as data"
  429.           + " FROM " + this.table
  430.           + " WHERE expires > ?";
  431.       values.add(Instant.now().toString());
  432.     }
  433.     // prepare statement
  434.     beginTransaction();
  435.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  436.       try {
  437.         statement.setQueryTimeout(1800);

  438.         // set parameters
  439.         for (int i = 0, len=values.size(); i < len; i++) {
  440.           statement.setString(i+1, values.get(i));
  441.         }

  442.         // execute
  443.         final List<Notification> notifications = getNotifications(statement);
  444.         commitTransaction();
  445.         return notifications;
  446.       } catch (SQLException e) {
  447.         LOGGER.log(Level.WARNING, "Exception finding notifications", e);
  448.         try {
  449.           // otherwise roll back
  450.           rollbackTransaction();
  451.         } catch (SQLException e2) {
  452.           // ignore
  453.         }
  454.       }
  455.     }
  456.     return new ArrayList<Notification>();
  457.   }

  458.   /**
  459.    * Find notifications with expires time before or equal to current time.
  460.    *
  461.    * @return list with matching notifications, empty if not found.
  462.    * @throws Exception if error occurs
  463.    */
  464.   @Override
  465.   public synchronized List<Notification> findExpiredNotifications() throws Exception {
  466.     final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 5000";
  467.     // prepare statement
  468.     beginTransaction();
  469.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  470.       try {
  471.         statement.setQueryTimeout(1800);

  472.         // set parameters
  473.         statement.setString(1, Instant.now().toString());

  474.         // execute
  475.         final List<Notification> notifications = getNotifications(statement);
  476.         commitTransaction();
  477.         return notifications;
  478.       } catch (SQLException e) {
  479.         LOGGER.log(Level.WARNING, "Exception finding notifications", e);
  480.         try {
  481.           // otherwise roll back
  482.           rollbackTransaction();
  483.         } catch (SQLException e2) {
  484.           // ignore
  485.         }
  486.       }
  487.     }
  488.     return new ArrayList<Notification>();

  489.   }

  490.   /**
  491.    * Search index for notifications for a specific product.
  492.    *
  493.    * @param id
  494.    *     the product id to search.
  495.    * @return list with matching notifications, empty if not found.
  496.    * @throws Exception if error occurs
  497.    */
  498.   @Override
  499.   public synchronized List<Notification> findNotifications(ProductId id) throws Exception {
  500.     final String sql = "SELECT * FROM " + this.table
  501.         + " WHERE source=? AND type=? AND code=? AND updatetime=?";
  502.     // prepare statement
  503.     beginTransaction();
  504.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  505.       try {
  506.         statement.setQueryTimeout(30);
  507.         // set parameters
  508.         statement.setString(1, id.getSource());
  509.         statement.setString(2, id.getType());
  510.         statement.setString(3, id.getCode());
  511.         statement.setLong(4, id.getUpdateTime().getTime());

  512.         // executes and commit ifsuccessful
  513.         final List<Notification> notifications = getNotifications(statement);
  514.         commitTransaction();
  515.         return notifications;
  516.       } catch (SQLException e) {
  517.         LOGGER.log(Level.WARNING, "Exception finding notifications", e);
  518.         try {
  519.           // otherwise roll back
  520.           rollbackTransaction();
  521.         } catch (SQLException e2) {
  522.           // ignore
  523.         }
  524.       }
  525.     }
  526.     return new ArrayList<Notification>();
  527.   }

  528.   /**
  529.    * This method is used to find notifications present in this index
  530.    * but not present in another JsonNotificationIndex table in the same
  531.    * database.
  532.    *
  533.    * This is used to optimize the queuing process at startup and returns
  534.    * DefaultNotifications.  The receiver process will look up the actual
  535.    * notification object during processing.
  536.    *
  537.    * @param otherTable
  538.    *     name of table in same database.
  539.    * @return
  540.    *     list of notifications found in this indexes table, but not found in the
  541.    *     other table.
  542.    * @throws Exception if error occurs
  543.    */
  544.   public synchronized List<Notification> getMissingNotifications(
  545.       final String otherTable) throws Exception {
  546.     // this is used to requeue a notification index.
  547.     // run query in a way that returns list of default notifications,
  548.     // (by returning empty created, data, and url)
  549.     // since full details are not needed during requeue
  550.     final String sql = "SELECT DISTINCT"
  551.         + " '' as created, t.expires, t.source, t.type, t.code, t.updateTime"
  552.         + ", '' as url, null as data"
  553.         + " FROM " + this.table + " t"
  554.         // only missing if not expired
  555.         + " WHERE t.expires > ?"
  556.         + " AND NOT EXISTS ("
  557.           + "SELECT * FROM " + otherTable
  558.             + " WHERE source=t.source AND type=t.type"
  559.             + " AND code=t.code AND updatetime=t.updateTime"
  560.         + ")";
  561.     // prepare statement
  562.     beginTransaction();
  563.     try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
  564.       try {
  565.         statement.setQueryTimeout(1800);

  566.         // set parameters
  567.         statement.setString(1, Instant.now().toString());

  568.         // execute and commit if successful
  569.         final List<Notification> notifications = getNotifications(statement);
  570.         commitTransaction();
  571.         return notifications;
  572.       } catch (SQLException e) {
  573.         LOGGER.log(Level.WARNING, "Exception finding notifications", e);
  574.         try {
  575.           // otherwise roll back
  576.           rollbackTransaction();
  577.         } catch (SQLException e2) {
  578.           // ignore
  579.         }
  580.       }
  581.     }
  582.     return new ArrayList<Notification>();
  583.   }

  584.   /**
  585.    * Parse notifications from a statement ready to be executed.
  586.    * @param ps PreparedStatement to be parsed
  587.    * @return List of notifications
  588.    * @throws Exception if error occurs
  589.    */
  590.   protected synchronized List<Notification> getNotifications(PreparedStatement ps)
  591.       throws Exception {
  592.     final List<Notification> n = new ArrayList<Notification>();
  593.     try (final ResultSet rs = ps.executeQuery()) {
  594.       while (rs.next()) {
  595.         n.add(parseNotification(
  596.             rs.getString("created"),
  597.             rs.getString("expires"),
  598.             rs.getString("source"),
  599.             rs.getString("type"),
  600.             rs.getString("code"),
  601.             rs.getLong("updatetime"),
  602.             rs.getString("url"),
  603.             rs.getString("data")));
  604.       }
  605.     }
  606.     return n;
  607.   }

  608.   /**
  609.    * Creates and returns a <code>Notification</code> based on the provided data.
  610.    *
  611.    * <ul>
  612.    * <li>Return a JSONNotification if <code>created</code> and <code>data</code>
  613.    * are set
  614.    * <li>Return a URLNotification if <code>url</code> is set
  615.    * <li>Otherwise, return a DefaultNotification
  616.    * </ul>
  617.    * @param created When created
  618.    * @param expires When notification expires
  619.    * @param source sources
  620.    * @param type types
  621.    * @param code codes
  622.    * @param updateTime updateTime
  623.    * @param url URL
  624.    * @param data data
  625.    * @return Notification, JSONNotification, URLNotification, or DefaultNotification
  626.    * @throws Exception if error occurs
  627.    */
  628.   protected Notification parseNotification(
  629.       final String created,
  630.       final String expires,
  631.       final String source,
  632.       final String type,
  633.       final String code,
  634.       final Long updateTime,
  635.       final String url,
  636.       final String data) throws Exception {
  637.     final Notification n;
  638.     final ProductId id = new ProductId(source, type, code, new Date(updateTime));
  639.     final Date expiresDate = Date.from(Instant.parse(expires));
  640.     if (!"".equals(created) && data != null) {
  641.       Product product = new JsonProduct().getProduct(
  642.           Json.createReader(
  643.             new ByteArrayInputStream(data.getBytes())
  644.           ).readObject());
  645.       n = new JsonNotification(Instant.parse(created), product);
  646.     } else if (!"".equals(url)) {
  647.       n = new URLNotification(id, expiresDate, null, new URL(url));
  648.     } else {
  649.       n = new DefaultNotification(id, expiresDate, null);
  650.     }
  651.     return n;
  652.   }

  653. }