AwsProductReceiver.java
package gov.usgs.earthquake.aws;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.earthquake.distribution.WebSocketClient;
import gov.usgs.earthquake.distribution.WebSocketListener;
import gov.usgs.util.Config;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Receives notifications from a PDL notification web socket.
*
* After initial connection, ignores broadcasts until catch up process is complete.
*
* Catch up involves sending a "products_created_after" request with the latest
* notification "created" timestamp, and processing products until either the
* last product matches the last broadcast or there are no more products after
* the latest notification "created" timestamp.
*/
public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener {
/** Initialzation of logger. For us later in file. */
public static final Logger LOGGER = Logger
.getLogger(AwsProductReceiver.class.getName());
/** Variable for URI string */
public static final String URI_PROPERTY = "url";
/** Variable for createdAfter string */
public static final String CREATED_AFTER_PROPERTY = "createdAfter";
/** Variable for trackingIndex string */
public static final String TRACKING_INDEX_PROPERTY = "trackingIndex";
/** Variable for trackingFileName string */
public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
/** Variable for connectAttempts string */
public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
/** Variable for connectTimeout string */
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
/** Variable for initialCatchUpAge string */
public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge";
/** Variable for tracking file. Links to data/AwsReceiver.json */
public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json";
/** Variable for connect attempts. Set to 5 */
public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
/** Variable for timout. Set to 1000 */
public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
/** Variable for catchup age. Set to 7.0 */
public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0";
private URI uri;
private String trackingFileName;
private int attempts;
private long timeout;
private TrackingIndex trackingIndex;
private WebSocketClient client;
/** Websocket session */
private Session session;
/** µs timestamp of last message that has been processed */
protected Instant createdAfter = null;
/** How far back to check when first connecting. */
protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE);
/** last broadcast message that has been processed (used for catch up) */
protected JsonNotification lastBroadcast = null;
/** ID of the previously mentioned last broadcast */
protected Long lastBroadcastId = null;
/** whether to process broadcast messages (after catching up). */
protected boolean processBroadcast = false;
/** whether currenting catching up. */
protected boolean catchUpRunning = false;
/** sync object for catchUp state. */
protected final Object catchUpSync = new Object();
/** thread where catch up process runs. */
protected Thread catchUpThread = null;
/** whether thread should continue running (shutdown flag) */
protected boolean catchUpThreadRunning = false;
/** last catch up message sent (for response timeouts) */
protected Instant lastCatchUpSent = null;
@Override
public void configure(Config config) throws Exception {
super.configure(config);
uri = new URI(config.getProperty(URI_PROPERTY));
attempts = Integer.parseInt(
config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
timeout = Long.parseLong(
config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
initialCatchUpAge = Double.valueOf(
config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE));
final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY);
if (trackingIndexName != null) {
LOGGER.config("[" + getName() + "] loading tracking index "
+ trackingIndexName);
try {
// read object from global config
trackingIndex = (TrackingIndex) Config.getConfig().getObject(trackingIndexName);
} catch (Exception e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] error loading tracking index "
+ trackingIndexName,
e);
}
} else {
trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY);
if (trackingFileName != null) {
LOGGER.config("[" + getName() + "] creating tracking index at"
+ trackingFileName);
trackingIndex = new TrackingIndex(
TrackingIndex.DEFAULT_DRIVER,
"jdbc:sqlite:" + trackingFileName);
}
}
}
/**
* Called when connection is first opened.
*
* Start catch up process.
*/
@Override
public void onOpen(Session session) throws IOException {
LOGGER.info("[" + getName() + "] onOpen connection_id=" + session.getId());
// save session
this.session = session;
// start catch up process
LOGGER.info("[" + getName() + "] Starting catch up");
// ignore broadcast until caught up
processBroadcast = false;
startCatchUp();
}
/**
* Called when connection is closed, either because shutdown on this end or
* closed by server.
*/
@Override
public void onClose(Session session, CloseReason closeReason) {
LOGGER.info("[" + getName() + "] onClose " + closeReason.toString());
this.session = null;
// cannot catch up when not connected, restart in onOpen
stopCatchUp();
}
/**
* Message handler function passed to WebSocketClient
*
* Parses the message as JSON, and checks "action" property to route message
* for handling.
*
* Synchronized to process messages in order, since onProductsCreatedAfter
* compares state of latest product to determine whether caught up and if
* broadcasts should be processed.
*
* @param message Message notification - string
*/
@Override
synchronized public void onMessage(String message) throws IOException {
try (final JsonReader reader = Json.createReader(new StringReader(message))) {
// parse message
final JsonObject json = reader.readObject();
final String action = json.getString("action");
if ("broadcast".equals(action)) {
onBroadcast(json);
} else if ("product".equals(action)) {
onProduct(json);
} else if ("products_created_after".equals(action)) {
onProductsCreatedAfter(json);
}
} catch (Exception e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] exception while processing message '" + message + "'",
e);
throw new IOException(e);
}
}
/**
* Handle a message with "action"="broadcast".
*
* If caught up process notification as usual, otherwise save notification
* to help detect when caught up.
*
* @param json JSON Message
* @throws Exception Exception
*/
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
LOGGER.finer("[" + getName() + "]"
+ " onBroadcast(" + notification.getProductId() + ")"
+ " sequence=" + broadcastId + ", lastSequence=" + lastBroadcastId);
if (processBroadcast &&
// sanity check, broadcast ids are expected to increment
lastBroadcastId != null
&& broadcastId != (lastBroadcastId + 1)
) {
// may have missed message
LOGGER.info("[" + getName() + "] broadcast ids out of sequence"
+ " (at " + lastBroadcastId + ", received " + broadcastId + ")"
+ ", switching to catch up mode");
processBroadcast = false;
startCatchUp();
}
// track last broadcast for catch up process (as long as newer)
if (lastBroadcastId == null || broadcastId > lastBroadcastId) {
lastBroadcastId = broadcastId;
lastBroadcast = notification;
}
// process message if not in catch up mode
if (processBroadcast) {
onJsonNotification(notification);
}
}
/**
* Process a received notification and update current "created" timestamp.
*
* @param notification JSON Notification
* @throws Exception Exception
*/
protected void onJsonNotification(final JsonNotification notification) throws Exception {
// receive and notify listeners
receiveNotification(notification);
// update tracking file
this.createdAfter = notification.created;
writeTrackingData();
// send heartbeat
HeartbeatListener.sendHeartbeatMessage(getName(), "createdAfter", createdAfter.toString());
}
/**
* Handle a message with "action"="product", which is received during catch up.
*
* @param json JSON Message
* @throws Exception Exception
*/
protected void onProduct(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.finer("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
onJsonNotification(notification);
}
/**
* Handle a message with "action"="products_created_after", which is received
* during catch up.
*
* Indicates the end of a response from a "products_created_after" request.
* Check whether caught up, and either switch to broadcast mode or continue
* catch up process.
*
* @param json JSON Message
* @throws Exception Exception
*/
protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
final String after = json.getString("created_after");
final int count = json.getInt("count");
LOGGER.finer("[" + getName() + "] onProductsCreatedAfter(" + after
+ ", " + count + " products)");
// notify background thread that a response was received,
// as well as pausing messages until restarted below (if needed)
stopCatchUp();
// check whether caught up
if (
// if a broadcast received during catchup,
(lastBroadcast != null &&
// and createdAfter is at or after last broadcast
createdAfter.compareTo(lastBroadcast.created) >= 0)
// or no additional products returned
|| (lastBroadcast == null && count == 0)
) {
// caught up
LOGGER.info("[" + getName() + "] Caught up, switching to broadcast");
processBroadcast = true;
} else {
// keep catching up
startCatchUp();
}
}
/**
* Catch up process.
*
* Do not run directly, use {@link #startCatchUpThread()} and
* {@link #stopCatchUpThread()} to start and stop the process.
*
* Process waits until {@link #startCatchUp()} is called,
* and uses {@link #throttleQueues()} between sends.
*/
@Override
public void run() {
while (catchUpThreadRunning) {
try {
synchronized (catchUpSync) {
if (!catchUpRunning) {
catchUpSync.wait();
continue;
}
if (lastCatchUpSent != null) {
// message already sent, wait for timeout
Instant now = Instant.now();
Instant timeout = lastCatchUpSent.plus(60, ChronoUnit.SECONDS);
if (now.isBefore(timeout)) {
catchUpSync.wait(now.until(timeout, ChronoUnit.MILLIS));
continue;
} else {
// timed out
LOGGER.warning("No products_created_after response"
+ ", sent at " + lastCatchUpSent.toString());
// fall through
}
}
}
// ready to send, but block until done throttling
throttleQueues();
try {
synchronized (catchUpSync) {
// connection may have closed while throttling
if (!catchUpRunning) {
continue;
}
sendProductsCreatedAfter();
// track when sent
lastCatchUpSent = Instant.now();
}
} catch (Exception e){
LOGGER.log(Level.WARNING, "Exception sending products_created_after", e);
if (catchUpThreadRunning && catchUpRunning) {
// wait before next attempt
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
// probably stopping
}
}
}
/**
* Send an "action"="products_created_after" request, which is part of the
* catch up process.
*
* The server will reply with zero or more "action"="product" messages, and
* then one "action"="products_created_after" message to indicate the request
* is complete.
*
* @throws IOException IOException
*/
protected void sendProductsCreatedAfter() throws IOException {
// set default for created after
if (this.createdAfter == null) {
this.createdAfter = Instant.now().minusSeconds(
Math.round(initialCatchUpAge * 86400));
}
String request = Json.createObjectBuilder()
.add("action", "products_created_after")
.add("created_after", this.createdAfter.toString())
.build()
.toString();
LOGGER.fine("[" + getName() + "] Sending " + request);
// start catch up process
this.session.getBasicRemote().sendText(request);
}
/**
* Notify running background thread to start catch up process.
*/
protected void startCatchUp() {
// notify background thread to start catch up
synchronized (catchUpSync) {
catchUpRunning = true;
// clear sent time
lastCatchUpSent = null;
catchUpSync.notify();
}
}
/**
* Start background thread for catch up process.
*/
protected void startCatchUpThread() {
if (catchUpThread != null) {
throw new IllegalStateException("catchUp thread already started");
}
synchronized (catchUpSync) {
catchUpThreadRunning = true;
catchUpThread = new Thread(this);
}
catchUpThread.start();
}
/**
* Notify running background thread to stop catch up process.
*/
protected void stopCatchUp() {
synchronized (catchUpSync) {
// stop catch up
catchUpRunning = false;
// clear sent time
lastCatchUpSent = null;
catchUpSync.notify();
}
}
/**
* Stop background thread for catch up process.
*/
protected void stopCatchUpThread() {
if (catchUpThread == null) {
return;
}
// stop catch up thread
try {
synchronized (catchUpSync) {
// orderly shutdown
catchUpThreadRunning = false;
catchUpSync.notify();
}
// interrupt just in case
catchUpThread.interrupt();
catchUpThread.join();
} catch (Exception e) {
LOGGER.log(Level.INFO, "Error stopping catchUpThread", e);
} finally {
catchUpThread = null;
}
}
@Override
public void onConnectFail() {
// client failed to connect
LOGGER.info("[" + getName() + "] onConnectFail");
}
@Override
public void onReconnectFail() {
// failed to reconnect after close
LOGGER.info("[" + getName() + "] onReconnectFail");
}
/**
* Reads createdAfter from a tracking file if it exists,
* then connects to web socket.
*
* @throws Exception Exception
*/
@Override
public void startup() throws Exception{
super.startup();
if (trackingIndex == null) {
trackingIndex = new TrackingIndex();
}
trackingIndex.startup();
//read sequence from tracking file if other parameters agree
JsonObject json = readTrackingData();
if (json != null && json.getString(URI_PROPERTY).equals(uri.toString())) {
createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
}
// open websocket
client = new WebSocketClient(uri, this, attempts, timeout, true);
// start catch up process
startCatchUpThread();
}
/**
* Closes web socket
* @throws Exception Exception
*/
@Override
public void shutdown() throws Exception {
// stop catch up process
stopCatchUpThread();
// close socket
try {
client.shutdown();
} catch (Exception e) {}
super.shutdown();
}
/**
* Reads tracking file.
*
* @return JsonObject tracking file
* @throws Exception Exception
*/
public JsonObject readTrackingData() throws Exception {
// use name as key
return trackingIndex.getTrackingData(getName());
}
/**
* Writes tracking file.
*
* @throws Exception Exception
*/
public void writeTrackingData() throws Exception {
JsonObject json = Json.createObjectBuilder()
.add(URI_PROPERTY, uri.toString())
.add(CREATED_AFTER_PROPERTY, createdAfter.toString())
.build();
// use name as key
trackingIndex.setTrackingData(getName(), json);
}
/**
* Getter for URI
* @return URI
*/
public URI getURI() {
return uri;
}
/**
* Setter for URI
* @param uri URI
*/
public void setURI(final URI uri) {
this.uri = uri;
}
/**
* Getter for trackingFileName
* @return name of tracking file
*/
public String getTrackingFileName() {
return trackingFileName;
}
/**
* Setter for trackingFileName
* @param trackingFileName trackingFileName
*/
public void setTrackingFileName(final String trackingFileName) {
this.trackingFileName = trackingFileName;
}
/**
* Getter for createdAfter
* @return createdAfter
*/
public Instant getCreatedAfter() {
return createdAfter;
}
/**
* Setter for createdAfter
* @param createdAfter createdAfter
*/
public void setCreatedAfter(final Instant createdAfter) {
this.createdAfter = createdAfter;
}
/**
* Getter for attempts
* @return attempts
*/
public int getAttempts() {
return attempts;
}
/**
* Setter for attempts
* @param attempts attempts
*/
public void setAttempts(final int attempts) {
this.attempts = attempts;
}
/**
* Getter for timeout
* @return timeout
*/
public long getTimeout() {
return timeout;
}
/**
* Setter for timeout
* @param timeout long timeout
*/
public void setTimeout(final long timeout) {
this.timeout = timeout;
}
}