Package gov.usgs.earthquake.aws
Class AwsProductReceiver
- java.lang.Object
-
- gov.usgs.util.DefaultConfigurable
-
- gov.usgs.earthquake.distribution.DefaultNotificationReceiver
-
- gov.usgs.earthquake.aws.AwsProductReceiver
-
- All Implemented Interfaces:
NotificationIndexCleanup.Listener
,NotificationReceiver
,WebSocketListener
,Configurable
,java.lang.Runnable
public class AwsProductReceiver extends DefaultNotificationReceiver implements java.lang.Runnable, WebSocketListener
Receives notifications from a PDL notification web socket. After initial connection, ignores broadcasts until catch up process is complete. Catch up involves sending a "products_created_after" request with the latest notification "created" timestamp, and processing products until either the last product matches the last broadcast or there are no more products after the latest notification "created" timestamp.
-
-
Field Summary
Fields Modifier and Type Field Description protected boolean
catchUpRunning
whether currenting catching up.protected java.lang.Object
catchUpSync
sync object for catchUp state.protected java.lang.Thread
catchUpThread
thread where catch up process runs.protected boolean
catchUpThreadRunning
whether thread should continue running (shutdown flag)static java.lang.String
CONNECT_ATTEMPTS_PROPERTY
Variable for connectAttempts stringstatic java.lang.String
CONNECT_TIMEOUT_PROPERTY
Variable for connectTimeout stringstatic java.lang.String
CREATED_AFTER_PROPERTY
Variable for createdAfter stringprotected java.time.Instant
createdAfter
µs timestamp of last message that has been processedstatic java.lang.String
DEFAULT_CONNECT_ATTEMPTS
Variable for connect attempts.static java.lang.String
DEFAULT_CONNECT_TIMEOUT
Variable for timout.static java.lang.String
DEFAULT_INITIAL_CATCHUP_AGE
Variable for catchup age.static java.lang.String
DEFAULT_TRACKING_FILE_NAME
Variable for tracking file.static java.lang.String
INITIAL_CATCHUP_AGE_PROPERTY
Variable for initialCatchUpAge stringprotected double
initialCatchUpAge
How far back to check when first connecting.protected JsonNotification
lastBroadcast
last broadcast message that has been processed (used for catch up)protected java.lang.Long
lastBroadcastId
ID of the previously mentioned last broadcastprotected java.time.Instant
lastCatchUpSent
last catch up message sent (for response timeouts)static java.util.logging.Logger
LOGGER
Initialzation of logger.protected boolean
processBroadcast
whether to process broadcast messages (after catching up).static java.lang.String
TRACKING_FILE_NAME_PROPERTY
Variable for trackingFileName stringstatic java.lang.String
TRACKING_INDEX_PROPERTY
Variable for trackingIndex stringstatic java.lang.String
URI_PROPERTY
Variable for URI string-
Fields inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
DEFAULT_PRODUCT_STORAGE_MAX_AGE, DEFAULT_READ_TIMEOUT, DEFAULT_RECEIVER_CLEANUP, EXECUTOR_LISTENER_NOTIFIER, FUTURE_LISTENER_NOTIFIER, INDEX_FILE_PROPERTY, LISTENER_NOTIFIER_PROPERTY, NOTIFICATION_INDEX_PROPERTY, PRODUCT_STORAGE_MAX_AGE_PROPERTY, PRODUCT_STORAGE_PROPERTY, READ_TIMEOUT_PROPERTY, RECEIVER_CLEANUP_PROPERTY, ROUNDROBIN_LISTENER_NOTIFIER, STORAGE_DIRECTORY_PROPERTY
-
-
Constructor Summary
Constructors Constructor Description AwsProductReceiver()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
configure(Config config)
Process configuration settings.int
getAttempts()
Getter for attemptsjava.time.Instant
getCreatedAfter()
Getter for createdAfterlong
getTimeout()
Getter for timeoutjava.lang.String
getTrackingFileName()
Getter for trackingFileNamejava.net.URI
getURI()
Getter for URIprotected void
onBroadcast(javax.json.JsonObject json)
Handle a message with "action"="broadcast".void
onClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason)
Called when connection is closed, either because shutdown on this end or closed by server.void
onConnectFail()
Interface method to be overriden by WebSocket files and AwsProductReceiverprotected void
onJsonNotification(JsonNotification notification)
Process a received notification and update current "created" timestamp.void
onMessage(java.lang.String message)
Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling.void
onOpen(javax.websocket.Session session)
Called when connection is first opened.protected void
onProduct(javax.json.JsonObject json)
Handle a message with "action"="product", which is received during catch up.protected void
onProductsCreatedAfter(javax.json.JsonObject json)
Handle a message with "action"="products_created_after", which is received during catch up.void
onReconnectFail()
Interface method to be overriden by WebSocket files and AwsProductReceiverjavax.json.JsonObject
readTrackingData()
Reads tracking file.void
run()
Catch up process.protected void
sendProductsCreatedAfter()
Send an "action"="products_created_after" request, which is part of the catch up process.void
setAttempts(int attempts)
Setter for attemptsvoid
setCreatedAfter(java.time.Instant createdAfter)
Setter for createdAftervoid
setTimeout(long timeout)
Setter for timeoutvoid
setTrackingFileName(java.lang.String trackingFileName)
Setter for trackingFileNamevoid
setURI(java.net.URI uri)
Setter for URIvoid
shutdown()
Closes web socketprotected void
startCatchUp()
Notify running background thread to start catch up process.protected void
startCatchUpThread()
Start background thread for catch up process.void
startup()
Reads createdAfter from a tracking file if it exists, then connects to web socket.protected void
stopCatchUp()
Notify running background thread to stop catch up process.protected void
stopCatchUpThread()
Stop background thread for catch up process.void
writeTrackingData()
Writes tracking file.-
Methods inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
addNotificationListener, getConnectTimeout, getListenerQueueStatus, getNotificationIndex, getNotifier, getProductStorage, getProductStorageMaxAge, getQueueStatus, getReadTimeout, getReceiverCleanupInterval, notifyListeners, onExpiredNotification, receiveNotification, removeExpiredNotifications, removeNotificationListener, retrieveProduct, sendNotifications, setConnectTimeout, setNotificationIndex, setNotifier, setProductStorage, setProductStorageMaxAge, setReadTimeout, setReceiverCleanupInterval, storeProductSource, throttleQueues
-
Methods inherited from class gov.usgs.util.DefaultConfigurable
getName, setName
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface gov.usgs.util.Configurable
getName, setName
-
-
-
-
Field Detail
-
LOGGER
public static final java.util.logging.Logger LOGGER
Initialzation of logger. For us later in file.
-
URI_PROPERTY
public static final java.lang.String URI_PROPERTY
Variable for URI string- See Also:
- Constant Field Values
-
CREATED_AFTER_PROPERTY
public static final java.lang.String CREATED_AFTER_PROPERTY
Variable for createdAfter string- See Also:
- Constant Field Values
-
TRACKING_INDEX_PROPERTY
public static final java.lang.String TRACKING_INDEX_PROPERTY
Variable for trackingIndex string- See Also:
- Constant Field Values
-
TRACKING_FILE_NAME_PROPERTY
public static final java.lang.String TRACKING_FILE_NAME_PROPERTY
Variable for trackingFileName string- See Also:
- Constant Field Values
-
CONNECT_ATTEMPTS_PROPERTY
public static final java.lang.String CONNECT_ATTEMPTS_PROPERTY
Variable for connectAttempts string- See Also:
- Constant Field Values
-
CONNECT_TIMEOUT_PROPERTY
public static final java.lang.String CONNECT_TIMEOUT_PROPERTY
Variable for connectTimeout string- See Also:
- Constant Field Values
-
INITIAL_CATCHUP_AGE_PROPERTY
public static final java.lang.String INITIAL_CATCHUP_AGE_PROPERTY
Variable for initialCatchUpAge string- See Also:
- Constant Field Values
-
DEFAULT_TRACKING_FILE_NAME
public static final java.lang.String DEFAULT_TRACKING_FILE_NAME
Variable for tracking file. Links to data/AwsReceiver.json- See Also:
- Constant Field Values
-
DEFAULT_CONNECT_ATTEMPTS
public static final java.lang.String DEFAULT_CONNECT_ATTEMPTS
Variable for connect attempts. Set to 5- See Also:
- Constant Field Values
-
DEFAULT_CONNECT_TIMEOUT
public static final java.lang.String DEFAULT_CONNECT_TIMEOUT
Variable for timout. Set to 1000- See Also:
- Constant Field Values
-
DEFAULT_INITIAL_CATCHUP_AGE
public static final java.lang.String DEFAULT_INITIAL_CATCHUP_AGE
Variable for catchup age. Set to 7.0- See Also:
- Constant Field Values
-
createdAfter
protected java.time.Instant createdAfter
µs timestamp of last message that has been processed
-
initialCatchUpAge
protected double initialCatchUpAge
How far back to check when first connecting.
-
lastBroadcast
protected JsonNotification lastBroadcast
last broadcast message that has been processed (used for catch up)
-
lastBroadcastId
protected java.lang.Long lastBroadcastId
ID of the previously mentioned last broadcast
-
processBroadcast
protected boolean processBroadcast
whether to process broadcast messages (after catching up).
-
catchUpRunning
protected boolean catchUpRunning
whether currenting catching up.
-
catchUpSync
protected final java.lang.Object catchUpSync
sync object for catchUp state.
-
catchUpThread
protected java.lang.Thread catchUpThread
thread where catch up process runs.
-
catchUpThreadRunning
protected boolean catchUpThreadRunning
whether thread should continue running (shutdown flag)
-
lastCatchUpSent
protected java.time.Instant lastCatchUpSent
last catch up message sent (for response timeouts)
-
-
Method Detail
-
configure
public void configure(Config config) throws java.lang.Exception
Description copied from class:DefaultConfigurable
Process configuration settings. Called before startup().- Specified by:
configure
in interfaceConfigurable
- Overrides:
configure
in classDefaultNotificationReceiver
- Parameters:
config
- the Config object with settings.- Throws:
java.lang.Exception
- if configuration exceptions occur.
-
onOpen
public void onOpen(javax.websocket.Session session) throws java.io.IOException
Called when connection is first opened. Start catch up process.- Specified by:
onOpen
in interfaceWebSocketListener
- Parameters:
session
- Session to open- Throws:
java.io.IOException
- IOException
-
onClose
public void onClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason)
Called when connection is closed, either because shutdown on this end or closed by server.- Specified by:
onClose
in interfaceWebSocketListener
- Parameters:
session
- Session to closecloseReason
- Reason for closing session
-
onMessage
public void onMessage(java.lang.String message) throws java.io.IOException
Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling. Synchronized to process messages in order, since onProductsCreatedAfter compares state of latest product to determine whether caught up and if broadcasts should be processed.- Specified by:
onMessage
in interfaceWebSocketListener
- Parameters:
message
- Message notification - string- Throws:
java.io.IOException
- IOException
-
onBroadcast
protected void onBroadcast(javax.json.JsonObject json) throws java.lang.Exception
Handle a message with "action"="broadcast". If caught up process notification as usual, otherwise save notification to help detect when caught up.- Parameters:
json
- JSON Message- Throws:
java.lang.Exception
- Exception
-
onJsonNotification
protected void onJsonNotification(JsonNotification notification) throws java.lang.Exception
Process a received notification and update current "created" timestamp.- Parameters:
notification
- JSON Notification- Throws:
java.lang.Exception
- Exception
-
onProduct
protected void onProduct(javax.json.JsonObject json) throws java.lang.Exception
Handle a message with "action"="product", which is received during catch up.- Parameters:
json
- JSON Message- Throws:
java.lang.Exception
- Exception
-
onProductsCreatedAfter
protected void onProductsCreatedAfter(javax.json.JsonObject json) throws java.lang.Exception
Handle a message with "action"="products_created_after", which is received during catch up. Indicates the end of a response from a "products_created_after" request. Check whether caught up, and either switch to broadcast mode or continue catch up process.- Parameters:
json
- JSON Message- Throws:
java.lang.Exception
- Exception
-
run
public void run()
Catch up process. Do not run directly, usestartCatchUpThread()
andstopCatchUpThread()
to start and stop the process. Process waits untilstartCatchUp()
is called, and usesDefaultNotificationReceiver.throttleQueues()
between sends.- Specified by:
run
in interfacejava.lang.Runnable
-
sendProductsCreatedAfter
protected void sendProductsCreatedAfter() throws java.io.IOException
Send an "action"="products_created_after" request, which is part of the catch up process. The server will reply with zero or more "action"="product" messages, and then one "action"="products_created_after" message to indicate the request is complete.- Throws:
java.io.IOException
- IOException
-
startCatchUp
protected void startCatchUp()
Notify running background thread to start catch up process.
-
startCatchUpThread
protected void startCatchUpThread()
Start background thread for catch up process.
-
stopCatchUp
protected void stopCatchUp()
Notify running background thread to stop catch up process.
-
stopCatchUpThread
protected void stopCatchUpThread()
Stop background thread for catch up process.
-
onConnectFail
public void onConnectFail()
Description copied from interface:WebSocketListener
Interface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onConnectFail
in interfaceWebSocketListener
-
onReconnectFail
public void onReconnectFail()
Description copied from interface:WebSocketListener
Interface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onReconnectFail
in interfaceWebSocketListener
-
startup
public void startup() throws java.lang.Exception
Reads createdAfter from a tracking file if it exists, then connects to web socket.- Specified by:
startup
in interfaceConfigurable
- Overrides:
startup
in classDefaultNotificationReceiver
- Throws:
java.lang.Exception
- Exception
-
shutdown
public void shutdown() throws java.lang.Exception
Closes web socket- Specified by:
shutdown
in interfaceConfigurable
- Overrides:
shutdown
in classDefaultNotificationReceiver
- Throws:
java.lang.Exception
- Exception
-
readTrackingData
public javax.json.JsonObject readTrackingData() throws java.lang.Exception
Reads tracking file.- Returns:
- JsonObject tracking file
- Throws:
java.lang.Exception
- Exception
-
writeTrackingData
public void writeTrackingData() throws java.lang.Exception
Writes tracking file.- Throws:
java.lang.Exception
- Exception
-
getURI
public java.net.URI getURI()
Getter for URI- Returns:
- URI
-
setURI
public void setURI(java.net.URI uri)
Setter for URI- Parameters:
uri
- URI
-
getTrackingFileName
public java.lang.String getTrackingFileName()
Getter for trackingFileName- Returns:
- name of tracking file
-
setTrackingFileName
public void setTrackingFileName(java.lang.String trackingFileName)
Setter for trackingFileName- Parameters:
trackingFileName
- trackingFileName
-
getCreatedAfter
public java.time.Instant getCreatedAfter()
Getter for createdAfter- Returns:
- createdAfter
-
setCreatedAfter
public void setCreatedAfter(java.time.Instant createdAfter)
Setter for createdAfter- Parameters:
createdAfter
- createdAfter
-
getAttempts
public int getAttempts()
Getter for attempts- Returns:
- attempts
-
setAttempts
public void setAttempts(int attempts)
Setter for attempts- Parameters:
attempts
- attempts
-
getTimeout
public long getTimeout()
Getter for timeout- Returns:
- timeout
-
setTimeout
public void setTimeout(long timeout)
Setter for timeout- Parameters:
timeout
- long timeout
-
-