AwsProductReceiver.java

  1. package gov.usgs.earthquake.aws;

  2. import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
  3. import gov.usgs.earthquake.distribution.HeartbeatListener;
  4. import gov.usgs.earthquake.distribution.WebSocketClient;
  5. import gov.usgs.earthquake.distribution.WebSocketListener;
  6. import gov.usgs.util.Config;

  7. import javax.json.Json;
  8. import javax.json.JsonObject;
  9. import javax.json.JsonReader;
  10. import javax.websocket.CloseReason;
  11. import javax.websocket.Session;
  12. import java.io.IOException;
  13. import java.io.StringReader;
  14. import java.net.URI;
  15. import java.time.Instant;
  16. import java.time.temporal.ChronoUnit;
  17. import java.util.logging.Level;
  18. import java.util.logging.Logger;

  19. /**
  20.  * Receives notifications from a PDL notification web socket.
  21.  *
  22.  * After initial connection, ignores broadcasts until catch up process is complete.
  23.  *
  24.  * Catch up involves sending a "products_created_after" request with the latest
  25.  * notification "created" timestamp, and processing products until either the
  26.  * last product matches the last broadcast or there are no more products after
  27.  * the latest notification "created" timestamp.
  28.  */
  29. public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener {

  30.   /** Initialzation of logger. For us later in file. */
  31.   public static final Logger LOGGER = Logger
  32.           .getLogger(AwsProductReceiver.class.getName());
  33.   /** Variable for URI string */
  34.   public static final String URI_PROPERTY = "url";
  35.   /** Variable for createdAfter string */
  36.   public static final String CREATED_AFTER_PROPERTY = "createdAfter";
  37.   /** Variable for trackingIndex string */
  38.   public static final String TRACKING_INDEX_PROPERTY = "trackingIndex";
  39.   /** Variable for trackingFileName string */
  40.   public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
  41.   /** Variable for connectAttempts string */
  42.   public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
  43.   /** Variable for connectTimeout string */
  44.   public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  45.   /** Variable for initialCatchUpAge string */
  46.   public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge";

  47.   /** Variable for tracking file. Links to data/AwsReceiver.json */
  48.   public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json";
  49.   /** Variable for connect attempts. Set to 5 */
  50.   public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
  51.   /** Variable for timout. Set to 1000 */
  52.   public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
  53.   /** Variable for catchup age. Set to 7.0 */
  54.   public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0";

  55.   private URI uri;
  56.   private String trackingFileName;
  57.   private int attempts;
  58.   private long timeout;

  59.   private TrackingIndex trackingIndex;
  60.   private WebSocketClient client;

  61.   /** Websocket session */
  62.   private Session session;

  63.   /** µs timestamp of last message that has been processed */
  64.   protected Instant createdAfter = null;

  65.   /** How far back to check when first connecting. */
  66.   protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE);

  67.   /** last broadcast message that has been processed (used for catch up) */
  68.   protected JsonNotification lastBroadcast = null;
  69.   /** ID of the previously mentioned last broadcast */
  70.   protected Long lastBroadcastId = null;
  71.   /** whether to process broadcast messages (after catching up). */
  72.   protected boolean processBroadcast = false;

  73.   /** whether currenting catching up. */
  74.   protected boolean catchUpRunning = false;
  75.   /** sync object for catchUp state. */
  76.   protected final Object catchUpSync = new Object();
  77.   /** thread where catch up process runs. */
  78.   protected Thread catchUpThread = null;
  79.   /** whether thread should continue running (shutdown flag) */
  80.   protected boolean catchUpThreadRunning = false;
  81.   /** last catch up message sent (for response timeouts) */
  82.   protected Instant lastCatchUpSent = null;

  83.   @Override
  84.   public void configure(Config config) throws Exception {
  85.     super.configure(config);

  86.     uri = new URI(config.getProperty(URI_PROPERTY));
  87.     attempts = Integer.parseInt(
  88.         config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
  89.     timeout = Long.parseLong(
  90.         config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
  91.     initialCatchUpAge = Double.valueOf(
  92.         config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE));

  93.     final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY);
  94.     if (trackingIndexName != null) {
  95.       LOGGER.config("[" + getName() + "] loading tracking index "
  96.           + trackingIndexName);
  97.       try {
  98.         // read object from global config
  99.         trackingIndex = (TrackingIndex) Config.getConfig().getObject(trackingIndexName);
  100.       } catch (Exception e) {
  101.         LOGGER.log(
  102.             Level.WARNING,
  103.             "[" + getName() + "] error loading tracking index "
  104.                 + trackingIndexName,
  105.             e);
  106.       }
  107.     } else {
  108.       trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY);
  109.       if (trackingFileName != null) {
  110.         LOGGER.config("[" + getName() + "] creating tracking index at"
  111.             + trackingFileName);
  112.         trackingIndex = new TrackingIndex(
  113.             TrackingIndex.DEFAULT_DRIVER,
  114.             "jdbc:sqlite:" + trackingFileName);
  115.       }
  116.     }
  117.   }

  118.   /**
  119.    * Called when connection is first opened.
  120.    *
  121.    * Start catch up process.
  122.    */
  123.   @Override
  124.   public void onOpen(Session session) throws IOException {
  125.     LOGGER.info("[" + getName() + "] onOpen connection_id=" + session.getId());
  126.     // save session
  127.     this.session = session;

  128.     // start catch up process
  129.     LOGGER.info("[" + getName() + "] Starting catch up");
  130.     // ignore broadcast until caught up
  131.     processBroadcast = false;
  132.     startCatchUp();
  133.   }

  134.   /**
  135.    * Called when connection is closed, either because shutdown on this end or
  136.    * closed by server.
  137.    */
  138.   @Override
  139.   public void onClose(Session session, CloseReason closeReason) {
  140.     LOGGER.info("[" + getName() + "] onClose " + closeReason.toString());
  141.     this.session = null;

  142.     // cannot catch up when not connected, restart in onOpen
  143.     stopCatchUp();
  144.   }

  145.   /**
  146.    * Message handler function passed to WebSocketClient
  147.    *
  148.    * Parses the message as JSON, and checks "action" property to route message
  149.    * for handling.
  150.    *
  151.    * Synchronized to process messages in order, since onProductsCreatedAfter
  152.    * compares state of latest product to determine whether caught up and if
  153.    * broadcasts should be processed.
  154.    *
  155.    * @param message Message notification - string
  156.    */
  157.   @Override
  158.   synchronized public void onMessage(String message) throws IOException {
  159.     try (final JsonReader reader = Json.createReader(new StringReader(message))) {
  160.       // parse message
  161.       final JsonObject json = reader.readObject();
  162.       final String action = json.getString("action");

  163.       if ("broadcast".equals(action)) {
  164.         onBroadcast(json);
  165.       } else if ("product".equals(action)) {
  166.         onProduct(json);
  167.       } else if ("products_created_after".equals(action)) {
  168.         onProductsCreatedAfter(json);
  169.       }
  170.     } catch (Exception e) {
  171.       LOGGER.log(
  172.           Level.WARNING,
  173.           "[" + getName() + "] exception while processing message '" + message + "'",
  174.           e);
  175.       throw new IOException(e);
  176.     }
  177.   }

  178.   /**
  179.    * Handle a message with "action"="broadcast".
  180.    *
  181.    * If caught up process notification as usual, otherwise save notification
  182.    * to help detect when caught up.
  183.    *
  184.    * @param json JSON Message
  185.    * @throws Exception Exception
  186.    */
  187.   protected void onBroadcast(final JsonObject json) throws Exception {
  188.     final JsonNotification notification = new JsonNotification(
  189.         json.getJsonObject("notification"));
  190.     Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
  191.     LOGGER.finer("[" + getName() + "]"
  192.         + " onBroadcast(" + notification.getProductId() + ")"
  193.         + " sequence=" + broadcastId + ", lastSequence=" + lastBroadcastId);

  194.     if (processBroadcast &&
  195.         // sanity check, broadcast ids are expected to increment
  196.         lastBroadcastId != null
  197.         && broadcastId != (lastBroadcastId + 1)
  198.     ) {
  199.       // may have missed message
  200.       LOGGER.info("[" + getName() + "] broadcast ids out of sequence"
  201.           + " (at " + lastBroadcastId + ", received " + broadcastId + ")"
  202.           + ", switching to catch up mode");
  203.       processBroadcast = false;
  204.       startCatchUp();
  205.     }

  206.     // track last broadcast for catch up process (as long as newer)
  207.     if (lastBroadcastId == null || broadcastId > lastBroadcastId) {
  208.       lastBroadcastId = broadcastId;
  209.       lastBroadcast = notification;
  210.     }

  211.     // process message if not in catch up mode
  212.     if (processBroadcast) {
  213.       onJsonNotification(notification);
  214.     }
  215.   }

  216.   /**
  217.    * Process a received notification and update current "created" timestamp.
  218.    *
  219.    * @param notification JSON Notification
  220.    * @throws Exception Exception
  221.    */
  222.   protected void onJsonNotification(final JsonNotification notification) throws Exception {
  223.     // receive and notify listeners
  224.     receiveNotification(notification);
  225.     // update tracking file
  226.     this.createdAfter = notification.created;
  227.     writeTrackingData();
  228.     // send heartbeat
  229.     HeartbeatListener.sendHeartbeatMessage(getName(), "createdAfter", createdAfter.toString());
  230.   }

  231.   /**
  232.    * Handle a message with "action"="product", which is received during catch up.
  233.    *
  234.    * @param json JSON Message
  235.    * @throws Exception Exception
  236.    */
  237.   protected void onProduct(final JsonObject json) throws Exception {
  238.     final JsonNotification notification = new JsonNotification(
  239.         json.getJsonObject("notification"));
  240.     LOGGER.finer("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
  241.     onJsonNotification(notification);
  242.   }

  243.   /**
  244.    * Handle a message with "action"="products_created_after", which is received
  245.    * during catch up.
  246.    *
  247.    * Indicates the end of a response from a "products_created_after" request.
  248.    * Check whether caught up, and either switch to broadcast mode or continue
  249.    * catch up process.
  250.    *
  251.    * @param json JSON Message
  252.    * @throws Exception Exception
  253.    */
  254.   protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
  255.     final String after = json.getString("created_after");
  256.     final int count = json.getInt("count");
  257.     LOGGER.finer("[" + getName() + "] onProductsCreatedAfter(" + after
  258.         + ", " + count + " products)");

  259.     // notify background thread that a response was received,
  260.     // as well as pausing messages until restarted below (if needed)
  261.     stopCatchUp();

  262.     // check whether caught up
  263.     if (
  264.         // if a broadcast received during catchup,
  265.         (lastBroadcast != null &&
  266.             // and createdAfter is at or after last broadcast
  267.             createdAfter.compareTo(lastBroadcast.created) >= 0)
  268.         // or no additional products returned
  269.         || (lastBroadcast == null && count == 0)
  270.     ) {
  271.       // caught up
  272.       LOGGER.info("[" + getName() + "] Caught up, switching to broadcast");
  273.       processBroadcast = true;
  274.     } else {
  275.       // keep catching up
  276.       startCatchUp();
  277.     }
  278.   }

  279.   /**
  280.    * Catch up process.
  281.    *
  282.    * Do not run directly, use {@link #startCatchUpThread()} and
  283.    * {@link #stopCatchUpThread()} to start and stop the process.
  284.    *
  285.    * Process waits until {@link #startCatchUp()} is called,
  286.    * and uses {@link #throttleQueues()} between sends.
  287.    */
  288.   @Override
  289.   public void run() {
  290.     while (catchUpThreadRunning) {
  291.       try {
  292.         synchronized (catchUpSync) {
  293.           if (!catchUpRunning) {
  294.             catchUpSync.wait();
  295.             continue;
  296.           }
  297.           if (lastCatchUpSent != null) {
  298.             // message already sent, wait for timeout
  299.             Instant now = Instant.now();
  300.             Instant timeout = lastCatchUpSent.plus(60, ChronoUnit.SECONDS);
  301.             if (now.isBefore(timeout)) {
  302.               catchUpSync.wait(now.until(timeout, ChronoUnit.MILLIS));
  303.               continue;
  304.             } else {
  305.               // timed out
  306.               LOGGER.warning("No products_created_after response"
  307.                   + ", sent at " + lastCatchUpSent.toString());
  308.               // fall through
  309.             }
  310.           }
  311.         }

  312.         // ready to send, but block until done throttling
  313.         throttleQueues();

  314.         try {
  315.           synchronized (catchUpSync) {
  316.             // connection may have closed while throttling
  317.             if (!catchUpRunning) {
  318.               continue;
  319.             }
  320.             sendProductsCreatedAfter();
  321.             // track when sent
  322.             lastCatchUpSent = Instant.now();
  323.           }
  324.         } catch (Exception e){
  325.           LOGGER.log(Level.WARNING, "Exception sending products_created_after", e);
  326.           if (catchUpThreadRunning && catchUpRunning) {
  327.             // wait before next attempt
  328.             Thread.sleep(1000);
  329.           }
  330.         }
  331.       } catch (InterruptedException e) {
  332.         // probably stopping
  333.       }
  334.     }
  335.   }

  336.   /**
  337.    * Send an "action"="products_created_after" request, which is part of the
  338.    * catch up process.
  339.    *
  340.    * The server will reply with zero or more "action"="product" messages, and
  341.    * then one "action"="products_created_after" message to indicate the request
  342.    * is complete.
  343.    *
  344.    * @throws IOException IOException
  345.    */
  346.   protected void sendProductsCreatedAfter() throws IOException {
  347.     // set default for created after
  348.     if (this.createdAfter == null) {
  349.       this.createdAfter = Instant.now().minusSeconds(
  350.           Math.round(initialCatchUpAge * 86400));
  351.     }
  352.     String request = Json.createObjectBuilder()
  353.         .add("action", "products_created_after")
  354.         .add("created_after", this.createdAfter.toString())
  355.         .build()
  356.         .toString();
  357.     LOGGER.fine("[" + getName() + "] Sending " + request);
  358.     // start catch up process
  359.     this.session.getBasicRemote().sendText(request);
  360.   }


  361.   /**
  362.    * Notify running background thread to start catch up process.
  363.    */
  364.   protected void startCatchUp() {
  365.     // notify background thread to start catch up
  366.     synchronized (catchUpSync) {
  367.       catchUpRunning = true;
  368.       // clear sent time
  369.       lastCatchUpSent = null;
  370.       catchUpSync.notify();
  371.     }
  372.   }

  373.   /**
  374.    * Start background thread for catch up process.
  375.    */
  376.   protected void startCatchUpThread() {
  377.     if (catchUpThread != null) {
  378.       throw new IllegalStateException("catchUp thread already started");
  379.     }
  380.     synchronized (catchUpSync) {
  381.       catchUpThreadRunning = true;
  382.       catchUpThread = new Thread(this);
  383.     }
  384.     catchUpThread.start();
  385.   }

  386.   /**
  387.    * Notify running background thread to stop catch up process.
  388.    */
  389.   protected void stopCatchUp() {
  390.     synchronized (catchUpSync) {
  391.       // stop catch up
  392.       catchUpRunning = false;
  393.       // clear sent time
  394.       lastCatchUpSent = null;
  395.       catchUpSync.notify();
  396.     }
  397.   }

  398.   /**
  399.    * Stop background thread for catch up process.
  400.    */
  401.   protected void stopCatchUpThread() {
  402.     if (catchUpThread == null) {
  403.       return;
  404.     }
  405.     // stop catch up thread
  406.     try {
  407.       synchronized (catchUpSync) {
  408.         // orderly shutdown
  409.         catchUpThreadRunning = false;
  410.         catchUpSync.notify();
  411.       }
  412.       // interrupt just in case
  413.       catchUpThread.interrupt();
  414.       catchUpThread.join();
  415.     } catch (Exception e) {
  416.       LOGGER.log(Level.INFO, "Error stopping catchUpThread", e);
  417.     } finally {
  418.       catchUpThread = null;
  419.     }
  420.   }

  421.   @Override
  422.   public void onConnectFail() {
  423.     // client failed to connect
  424.     LOGGER.info("[" + getName() + "] onConnectFail");
  425.   }

  426.   @Override
  427.   public void onReconnectFail() {
  428.     // failed to reconnect after close
  429.     LOGGER.info("[" + getName() + "] onReconnectFail");
  430.   }

  431.   /**
  432.    * Reads createdAfter from a tracking file if it exists,
  433.    * then connects to web socket.
  434.    *
  435.    * @throws Exception Exception
  436.    */
  437.   @Override
  438.   public void startup() throws Exception{
  439.     super.startup();
  440.     if (trackingIndex == null) {
  441.       trackingIndex = new TrackingIndex();
  442.     }
  443.     trackingIndex.startup();

  444.     //read sequence from tracking file if other parameters agree
  445.     JsonObject json = readTrackingData();
  446.     if (json != null && json.getString(URI_PROPERTY).equals(uri.toString())) {
  447.       createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
  448.     }

  449.     // open websocket
  450.     client = new WebSocketClient(uri, this, attempts, timeout, true);

  451.     // start catch up process
  452.     startCatchUpThread();
  453.   }

  454.   /**
  455.    * Closes web socket
  456.    * @throws Exception Exception
  457.    */
  458.   @Override
  459.   public void shutdown() throws Exception {
  460.     // stop catch up process
  461.     stopCatchUpThread();
  462.     // close socket
  463.     try {
  464.       client.shutdown();
  465.     } catch (Exception e) {}
  466.     super.shutdown();
  467.   }

  468.   /**
  469.    * Reads tracking file.
  470.    *
  471.    * @return  JsonObject tracking file
  472.    * @throws Exception Exception
  473.    */
  474.   public JsonObject readTrackingData() throws Exception {
  475.     // use name as key
  476.     return trackingIndex.getTrackingData(getName());
  477.   }

  478.   /**
  479.    * Writes tracking file.
  480.    *
  481.    * @throws Exception Exception
  482.    */
  483.   public void writeTrackingData() throws Exception {
  484.     JsonObject json = Json.createObjectBuilder()
  485.             .add(URI_PROPERTY, uri.toString())
  486.             .add(CREATED_AFTER_PROPERTY, createdAfter.toString())
  487.             .build();
  488.     // use name as key
  489.     trackingIndex.setTrackingData(getName(), json);
  490.   }

  491.   /**
  492.    * Getter for URI
  493.    * @return URI
  494.    */
  495.   public URI getURI() {
  496.     return uri;
  497.   }

  498.   /**
  499.    * Setter for URI
  500.    * @param uri URI
  501.    */
  502.   public void setURI(final URI uri) {
  503.     this.uri = uri;
  504.   }

  505.   /**
  506.    * Getter for trackingFileName
  507.    * @return name of tracking file
  508.    */
  509.   public String getTrackingFileName() {
  510.     return trackingFileName;
  511.   }

  512.    /**
  513.    * Setter for trackingFileName
  514.    * @param trackingFileName trackingFileName
  515.    */
  516.   public void setTrackingFileName(final String trackingFileName) {
  517.     this.trackingFileName = trackingFileName;
  518.   }

  519.   /**
  520.    * Getter for createdAfter
  521.    * @return createdAfter
  522.    */
  523.   public Instant getCreatedAfter() {
  524.     return createdAfter;
  525.   }

  526.   /**
  527.    * Setter for createdAfter
  528.    * @param createdAfter createdAfter
  529.    */
  530.   public void setCreatedAfter(final Instant createdAfter) {
  531.     this.createdAfter = createdAfter;
  532.   }

  533.   /**
  534.    * Getter for attempts
  535.    * @return attempts
  536.    */
  537.   public int getAttempts() {
  538.     return attempts;
  539.   }

  540.   /**
  541.    * Setter for attempts
  542.    * @param attempts attempts
  543.    */
  544.   public void setAttempts(final int attempts) {
  545.     this.attempts = attempts;
  546.   }

  547.   /**
  548.    * Getter for timeout
  549.    * @return timeout
  550.    */
  551.   public long getTimeout() {
  552.     return timeout;
  553.   }

  554.   /**
  555.    * Setter for timeout
  556.    * @param timeout long timeout
  557.    */
  558.   public void setTimeout(final long timeout) {
  559.     this.timeout = timeout;
  560.   }

  561. }