AwsProductReceiver.java
- package gov.usgs.earthquake.aws;
- import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
- import gov.usgs.earthquake.distribution.HeartbeatListener;
- import gov.usgs.earthquake.distribution.WebSocketClient;
- import gov.usgs.earthquake.distribution.WebSocketListener;
- import gov.usgs.util.Config;
- import javax.json.Json;
- import javax.json.JsonObject;
- import javax.json.JsonReader;
- import javax.websocket.CloseReason;
- import javax.websocket.Session;
- import java.io.IOException;
- import java.io.StringReader;
- import java.net.URI;
- import java.time.Instant;
- import java.time.temporal.ChronoUnit;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Receives notifications from a PDL notification web socket.
- *
- * After initial connection, ignores broadcasts until catch up process is complete.
- *
- * Catch up involves sending a "products_created_after" request with the latest
- * notification "created" timestamp, and processing products until either the
- * last product matches the last broadcast or there are no more products after
- * the latest notification "created" timestamp.
- */
- public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener {
- /** Initialzation of logger. For us later in file. */
- public static final Logger LOGGER = Logger
- .getLogger(AwsProductReceiver.class.getName());
- /** Variable for URI string */
- public static final String URI_PROPERTY = "url";
- /** Variable for createdAfter string */
- public static final String CREATED_AFTER_PROPERTY = "createdAfter";
- /** Variable for trackingIndex string */
- public static final String TRACKING_INDEX_PROPERTY = "trackingIndex";
- /** Variable for trackingFileName string */
- public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
- /** Variable for connectAttempts string */
- public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
- /** Variable for connectTimeout string */
- public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
- /** Variable for initialCatchUpAge string */
- public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge";
- /** Variable for tracking file. Links to data/AwsReceiver.json */
- public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json";
- /** Variable for connect attempts. Set to 5 */
- public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
- /** Variable for timout. Set to 1000 */
- public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
- /** Variable for catchup age. Set to 7.0 */
- public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0";
- private URI uri;
- private String trackingFileName;
- private int attempts;
- private long timeout;
- private TrackingIndex trackingIndex;
- private WebSocketClient client;
- /** Websocket session */
- private Session session;
- /** µs timestamp of last message that has been processed */
- protected Instant createdAfter = null;
- /** How far back to check when first connecting. */
- protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE);
- /** last broadcast message that has been processed (used for catch up) */
- protected JsonNotification lastBroadcast = null;
- /** ID of the previously mentioned last broadcast */
- protected Long lastBroadcastId = null;
- /** whether to process broadcast messages (after catching up). */
- protected boolean processBroadcast = false;
- /** whether currenting catching up. */
- protected boolean catchUpRunning = false;
- /** sync object for catchUp state. */
- protected final Object catchUpSync = new Object();
- /** thread where catch up process runs. */
- protected Thread catchUpThread = null;
- /** whether thread should continue running (shutdown flag) */
- protected boolean catchUpThreadRunning = false;
- /** last catch up message sent (for response timeouts) */
- protected Instant lastCatchUpSent = null;
- @Override
- public void configure(Config config) throws Exception {
- super.configure(config);
- uri = new URI(config.getProperty(URI_PROPERTY));
- attempts = Integer.parseInt(
- config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
- timeout = Long.parseLong(
- config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
- initialCatchUpAge = Double.valueOf(
- config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE));
- final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY);
- if (trackingIndexName != null) {
- LOGGER.config("[" + getName() + "] loading tracking index "
- + trackingIndexName);
- try {
- // read object from global config
- trackingIndex = (TrackingIndex) Config.getConfig().getObject(trackingIndexName);
- } catch (Exception e) {
- LOGGER.log(
- Level.WARNING,
- "[" + getName() + "] error loading tracking index "
- + trackingIndexName,
- e);
- }
- } else {
- trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY);
- if (trackingFileName != null) {
- LOGGER.config("[" + getName() + "] creating tracking index at"
- + trackingFileName);
- trackingIndex = new TrackingIndex(
- TrackingIndex.DEFAULT_DRIVER,
- "jdbc:sqlite:" + trackingFileName);
- }
- }
- }
- /**
- * Called when connection is first opened.
- *
- * Start catch up process.
- */
- @Override
- public void onOpen(Session session) throws IOException {
- LOGGER.info("[" + getName() + "] onOpen connection_id=" + session.getId());
- // save session
- this.session = session;
- // start catch up process
- LOGGER.info("[" + getName() + "] Starting catch up");
- // ignore broadcast until caught up
- processBroadcast = false;
- startCatchUp();
- }
- /**
- * Called when connection is closed, either because shutdown on this end or
- * closed by server.
- */
- @Override
- public void onClose(Session session, CloseReason closeReason) {
- LOGGER.info("[" + getName() + "] onClose " + closeReason.toString());
- this.session = null;
- // cannot catch up when not connected, restart in onOpen
- stopCatchUp();
- }
- /**
- * Message handler function passed to WebSocketClient
- *
- * Parses the message as JSON, and checks "action" property to route message
- * for handling.
- *
- * Synchronized to process messages in order, since onProductsCreatedAfter
- * compares state of latest product to determine whether caught up and if
- * broadcasts should be processed.
- *
- * @param message Message notification - string
- */
- @Override
- synchronized public void onMessage(String message) throws IOException {
- try (final JsonReader reader = Json.createReader(new StringReader(message))) {
- // parse message
- final JsonObject json = reader.readObject();
- final String action = json.getString("action");
- if ("broadcast".equals(action)) {
- onBroadcast(json);
- } else if ("product".equals(action)) {
- onProduct(json);
- } else if ("products_created_after".equals(action)) {
- onProductsCreatedAfter(json);
- }
- } catch (Exception e) {
- LOGGER.log(
- Level.WARNING,
- "[" + getName() + "] exception while processing message '" + message + "'",
- e);
- throw new IOException(e);
- }
- }
- /**
- * Handle a message with "action"="broadcast".
- *
- * If caught up process notification as usual, otherwise save notification
- * to help detect when caught up.
- *
- * @param json JSON Message
- * @throws Exception Exception
- */
- protected void onBroadcast(final JsonObject json) throws Exception {
- final JsonNotification notification = new JsonNotification(
- json.getJsonObject("notification"));
- Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
- LOGGER.finer("[" + getName() + "]"
- + " onBroadcast(" + notification.getProductId() + ")"
- + " sequence=" + broadcastId + ", lastSequence=" + lastBroadcastId);
- if (processBroadcast &&
- // sanity check, broadcast ids are expected to increment
- lastBroadcastId != null
- && broadcastId != (lastBroadcastId + 1)
- ) {
- // may have missed message
- LOGGER.info("[" + getName() + "] broadcast ids out of sequence"
- + " (at " + lastBroadcastId + ", received " + broadcastId + ")"
- + ", switching to catch up mode");
- processBroadcast = false;
- startCatchUp();
- }
- // track last broadcast for catch up process (as long as newer)
- if (lastBroadcastId == null || broadcastId > lastBroadcastId) {
- lastBroadcastId = broadcastId;
- lastBroadcast = notification;
- }
- // process message if not in catch up mode
- if (processBroadcast) {
- onJsonNotification(notification);
- }
- }
- /**
- * Process a received notification and update current "created" timestamp.
- *
- * @param notification JSON Notification
- * @throws Exception Exception
- */
- protected void onJsonNotification(final JsonNotification notification) throws Exception {
- // receive and notify listeners
- receiveNotification(notification);
- // update tracking file
- this.createdAfter = notification.created;
- writeTrackingData();
- // send heartbeat
- HeartbeatListener.sendHeartbeatMessage(getName(), "createdAfter", createdAfter.toString());
- }
- /**
- * Handle a message with "action"="product", which is received during catch up.
- *
- * @param json JSON Message
- * @throws Exception Exception
- */
- protected void onProduct(final JsonObject json) throws Exception {
- final JsonNotification notification = new JsonNotification(
- json.getJsonObject("notification"));
- LOGGER.finer("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
- onJsonNotification(notification);
- }
- /**
- * Handle a message with "action"="products_created_after", which is received
- * during catch up.
- *
- * Indicates the end of a response from a "products_created_after" request.
- * Check whether caught up, and either switch to broadcast mode or continue
- * catch up process.
- *
- * @param json JSON Message
- * @throws Exception Exception
- */
- protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
- final String after = json.getString("created_after");
- final int count = json.getInt("count");
- LOGGER.finer("[" + getName() + "] onProductsCreatedAfter(" + after
- + ", " + count + " products)");
- // notify background thread that a response was received,
- // as well as pausing messages until restarted below (if needed)
- stopCatchUp();
- // check whether caught up
- if (
- // if a broadcast received during catchup,
- (lastBroadcast != null &&
- // and createdAfter is at or after last broadcast
- createdAfter.compareTo(lastBroadcast.created) >= 0)
- // or no additional products returned
- || (lastBroadcast == null && count == 0)
- ) {
- // caught up
- LOGGER.info("[" + getName() + "] Caught up, switching to broadcast");
- processBroadcast = true;
- } else {
- // keep catching up
- startCatchUp();
- }
- }
- /**
- * Catch up process.
- *
- * Do not run directly, use {@link #startCatchUpThread()} and
- * {@link #stopCatchUpThread()} to start and stop the process.
- *
- * Process waits until {@link #startCatchUp()} is called,
- * and uses {@link #throttleQueues()} between sends.
- */
- @Override
- public void run() {
- while (catchUpThreadRunning) {
- try {
- synchronized (catchUpSync) {
- if (!catchUpRunning) {
- catchUpSync.wait();
- continue;
- }
- if (lastCatchUpSent != null) {
- // message already sent, wait for timeout
- Instant now = Instant.now();
- Instant timeout = lastCatchUpSent.plus(60, ChronoUnit.SECONDS);
- if (now.isBefore(timeout)) {
- catchUpSync.wait(now.until(timeout, ChronoUnit.MILLIS));
- continue;
- } else {
- // timed out
- LOGGER.warning("No products_created_after response"
- + ", sent at " + lastCatchUpSent.toString());
- // fall through
- }
- }
- }
- // ready to send, but block until done throttling
- throttleQueues();
- try {
- synchronized (catchUpSync) {
- // connection may have closed while throttling
- if (!catchUpRunning) {
- continue;
- }
- sendProductsCreatedAfter();
- // track when sent
- lastCatchUpSent = Instant.now();
- }
- } catch (Exception e){
- LOGGER.log(Level.WARNING, "Exception sending products_created_after", e);
- if (catchUpThreadRunning && catchUpRunning) {
- // wait before next attempt
- Thread.sleep(1000);
- }
- }
- } catch (InterruptedException e) {
- // probably stopping
- }
- }
- }
- /**
- * Send an "action"="products_created_after" request, which is part of the
- * catch up process.
- *
- * The server will reply with zero or more "action"="product" messages, and
- * then one "action"="products_created_after" message to indicate the request
- * is complete.
- *
- * @throws IOException IOException
- */
- protected void sendProductsCreatedAfter() throws IOException {
- // set default for created after
- if (this.createdAfter == null) {
- this.createdAfter = Instant.now().minusSeconds(
- Math.round(initialCatchUpAge * 86400));
- }
- String request = Json.createObjectBuilder()
- .add("action", "products_created_after")
- .add("created_after", this.createdAfter.toString())
- .build()
- .toString();
- LOGGER.fine("[" + getName() + "] Sending " + request);
- // start catch up process
- this.session.getBasicRemote().sendText(request);
- }
- /**
- * Notify running background thread to start catch up process.
- */
- protected void startCatchUp() {
- // notify background thread to start catch up
- synchronized (catchUpSync) {
- catchUpRunning = true;
- // clear sent time
- lastCatchUpSent = null;
- catchUpSync.notify();
- }
- }
- /**
- * Start background thread for catch up process.
- */
- protected void startCatchUpThread() {
- if (catchUpThread != null) {
- throw new IllegalStateException("catchUp thread already started");
- }
- synchronized (catchUpSync) {
- catchUpThreadRunning = true;
- catchUpThread = new Thread(this);
- }
- catchUpThread.start();
- }
- /**
- * Notify running background thread to stop catch up process.
- */
- protected void stopCatchUp() {
- synchronized (catchUpSync) {
- // stop catch up
- catchUpRunning = false;
- // clear sent time
- lastCatchUpSent = null;
- catchUpSync.notify();
- }
- }
- /**
- * Stop background thread for catch up process.
- */
- protected void stopCatchUpThread() {
- if (catchUpThread == null) {
- return;
- }
- // stop catch up thread
- try {
- synchronized (catchUpSync) {
- // orderly shutdown
- catchUpThreadRunning = false;
- catchUpSync.notify();
- }
- // interrupt just in case
- catchUpThread.interrupt();
- catchUpThread.join();
- } catch (Exception e) {
- LOGGER.log(Level.INFO, "Error stopping catchUpThread", e);
- } finally {
- catchUpThread = null;
- }
- }
- @Override
- public void onConnectFail() {
- // client failed to connect
- LOGGER.info("[" + getName() + "] onConnectFail");
- }
- @Override
- public void onReconnectFail() {
- // failed to reconnect after close
- LOGGER.info("[" + getName() + "] onReconnectFail");
- }
- /**
- * Reads createdAfter from a tracking file if it exists,
- * then connects to web socket.
- *
- * @throws Exception Exception
- */
- @Override
- public void startup() throws Exception{
- super.startup();
- if (trackingIndex == null) {
- trackingIndex = new TrackingIndex();
- }
- trackingIndex.startup();
- //read sequence from tracking file if other parameters agree
- JsonObject json = readTrackingData();
- if (json != null && json.getString(URI_PROPERTY).equals(uri.toString())) {
- createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
- }
- // open websocket
- client = new WebSocketClient(uri, this, attempts, timeout, true);
- // start catch up process
- startCatchUpThread();
- }
- /**
- * Closes web socket
- * @throws Exception Exception
- */
- @Override
- public void shutdown() throws Exception {
- // stop catch up process
- stopCatchUpThread();
- // close socket
- try {
- client.shutdown();
- } catch (Exception e) {}
- super.shutdown();
- }
- /**
- * Reads tracking file.
- *
- * @return JsonObject tracking file
- * @throws Exception Exception
- */
- public JsonObject readTrackingData() throws Exception {
- // use name as key
- return trackingIndex.getTrackingData(getName());
- }
- /**
- * Writes tracking file.
- *
- * @throws Exception Exception
- */
- public void writeTrackingData() throws Exception {
- JsonObject json = Json.createObjectBuilder()
- .add(URI_PROPERTY, uri.toString())
- .add(CREATED_AFTER_PROPERTY, createdAfter.toString())
- .build();
- // use name as key
- trackingIndex.setTrackingData(getName(), json);
- }
- /**
- * Getter for URI
- * @return URI
- */
- public URI getURI() {
- return uri;
- }
- /**
- * Setter for URI
- * @param uri URI
- */
- public void setURI(final URI uri) {
- this.uri = uri;
- }
- /**
- * Getter for trackingFileName
- * @return name of tracking file
- */
- public String getTrackingFileName() {
- return trackingFileName;
- }
- /**
- * Setter for trackingFileName
- * @param trackingFileName trackingFileName
- */
- public void setTrackingFileName(final String trackingFileName) {
- this.trackingFileName = trackingFileName;
- }
- /**
- * Getter for createdAfter
- * @return createdAfter
- */
- public Instant getCreatedAfter() {
- return createdAfter;
- }
- /**
- * Setter for createdAfter
- * @param createdAfter createdAfter
- */
- public void setCreatedAfter(final Instant createdAfter) {
- this.createdAfter = createdAfter;
- }
- /**
- * Getter for attempts
- * @return attempts
- */
- public int getAttempts() {
- return attempts;
- }
- /**
- * Setter for attempts
- * @param attempts attempts
- */
- public void setAttempts(final int attempts) {
- this.attempts = attempts;
- }
- /**
- * Getter for timeout
- * @return timeout
- */
- public long getTimeout() {
- return timeout;
- }
- /**
- * Setter for timeout
- * @param timeout long timeout
- */
- public void setTimeout(final long timeout) {
- this.timeout = timeout;
- }
- }