Package gov.usgs.earthquake.aws
Class AwsProductReceiver
- java.lang.Object
-
- gov.usgs.util.DefaultConfigurable
-
- gov.usgs.earthquake.distribution.DefaultNotificationReceiver
-
- gov.usgs.earthquake.aws.AwsProductReceiver
-
- All Implemented Interfaces:
NotificationIndexCleanup.Listener,NotificationReceiver,WebSocketListener,Configurable,java.lang.Runnable
public class AwsProductReceiver extends DefaultNotificationReceiver implements java.lang.Runnable, WebSocketListener
Receives notifications from a PDL notification web socket. After initial connection, ignores broadcasts until catch up process is complete. Catch up involves sending a "products_created_after" request with the latest notification "created" timestamp, and processing products until either the last product matches the last broadcast or there are no more products after the latest notification "created" timestamp.
-
-
Field Summary
Fields Modifier and Type Field Description protected booleancatchUpRunningwhether currenting catching up.protected java.lang.ObjectcatchUpSyncsync object for catchUp state.protected java.lang.ThreadcatchUpThreadthread where catch up process runs.protected booleancatchUpThreadRunningwhether thread should continue running (shutdown flag)static java.lang.StringCONNECT_ATTEMPTS_PROPERTYVariable for connectAttempts stringstatic java.lang.StringCONNECT_TIMEOUT_PROPERTYVariable for connectTimeout stringstatic java.lang.StringCREATED_AFTER_PROPERTYVariable for createdAfter stringprotected java.time.InstantcreatedAfterµs timestamp of last message that has been processedstatic java.lang.StringDEFAULT_CONNECT_ATTEMPTSVariable for connect attempts.static java.lang.StringDEFAULT_CONNECT_TIMEOUTVariable for timout.static java.lang.StringDEFAULT_INITIAL_CATCHUP_AGEVariable for catchup age.static java.lang.StringDEFAULT_TRACKING_FILE_NAMEVariable for tracking file.static java.lang.StringINITIAL_CATCHUP_AGE_PROPERTYVariable for initialCatchUpAge stringprotected doubleinitialCatchUpAgeHow far back to check when first connecting.protected JsonNotificationlastBroadcastlast broadcast message that has been processed (used for catch up)protected java.lang.LonglastBroadcastIdID of the previously mentioned last broadcastprotected java.time.InstantlastCatchUpSentlast catch up message sent (for response timeouts)static java.util.logging.LoggerLOGGERInitialzation of logger.protected booleanprocessBroadcastwhether to process broadcast messages (after catching up).static java.lang.StringTRACKING_FILE_NAME_PROPERTYVariable for trackingFileName stringstatic java.lang.StringTRACKING_INDEX_PROPERTYVariable for trackingIndex stringstatic java.lang.StringURI_PROPERTYVariable for URI string-
Fields inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
DEFAULT_PRODUCT_STORAGE_MAX_AGE, DEFAULT_READ_TIMEOUT, DEFAULT_RECEIVER_CLEANUP, EXECUTOR_LISTENER_NOTIFIER, FUTURE_LISTENER_NOTIFIER, INDEX_FILE_PROPERTY, LISTENER_NOTIFIER_PROPERTY, NOTIFICATION_INDEX_PROPERTY, PRODUCT_STORAGE_MAX_AGE_PROPERTY, PRODUCT_STORAGE_PROPERTY, READ_TIMEOUT_PROPERTY, RECEIVER_CLEANUP_PROPERTY, ROUNDROBIN_LISTENER_NOTIFIER, STORAGE_DIRECTORY_PROPERTY
-
-
Constructor Summary
Constructors Constructor Description AwsProductReceiver()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidconfigure(Config config)Process configuration settings.intgetAttempts()Getter for attemptsjava.time.InstantgetCreatedAfter()Getter for createdAfterlonggetTimeout()Getter for timeoutjava.lang.StringgetTrackingFileName()Getter for trackingFileNamejava.net.URIgetURI()Getter for URIprotected voidonBroadcast(javax.json.JsonObject json)Handle a message with "action"="broadcast".voidonClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason)Called when connection is closed, either because shutdown on this end or closed by server.voidonConnectFail()Interface method to be overriden by WebSocket files and AwsProductReceiverprotected voidonJsonNotification(JsonNotification notification)Process a received notification and update current "created" timestamp.voidonMessage(java.lang.String message)Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling.voidonOpen(javax.websocket.Session session)Called when connection is first opened.protected voidonProduct(javax.json.JsonObject json)Handle a message with "action"="product", which is received during catch up.protected voidonProductsCreatedAfter(javax.json.JsonObject json)Handle a message with "action"="products_created_after", which is received during catch up.voidonReconnectFail()Interface method to be overriden by WebSocket files and AwsProductReceiverjavax.json.JsonObjectreadTrackingData()Reads tracking file.voidrun()Catch up process.protected voidsendProductsCreatedAfter()Send an "action"="products_created_after" request, which is part of the catch up process.voidsetAttempts(int attempts)Setter for attemptsvoidsetCreatedAfter(java.time.Instant createdAfter)Setter for createdAftervoidsetTimeout(long timeout)Setter for timeoutvoidsetTrackingFileName(java.lang.String trackingFileName)Setter for trackingFileNamevoidsetURI(java.net.URI uri)Setter for URIvoidshutdown()Closes web socketprotected voidstartCatchUp()Notify running background thread to start catch up process.protected voidstartCatchUpThread()Start background thread for catch up process.voidstartup()Reads createdAfter from a tracking file if it exists, then connects to web socket.protected voidstopCatchUp()Notify running background thread to stop catch up process.protected voidstopCatchUpThread()Stop background thread for catch up process.voidwriteTrackingData()Writes tracking file.-
Methods inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
addNotificationListener, getConnectTimeout, getListenerQueueStatus, getNotificationIndex, getNotifier, getProductStorage, getProductStorageMaxAge, getQueueStatus, getReadTimeout, getReceiverCleanupInterval, notifyListeners, onExpiredNotification, receiveNotification, removeExpiredNotifications, removeNotificationListener, retrieveProduct, sendNotifications, setConnectTimeout, setNotificationIndex, setNotifier, setProductStorage, setProductStorageMaxAge, setReadTimeout, setReceiverCleanupInterval, storeProductSource, throttleQueues
-
Methods inherited from class gov.usgs.util.DefaultConfigurable
getName, setName
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface gov.usgs.util.Configurable
getName, setName
-
-
-
-
Field Detail
-
LOGGER
public static final java.util.logging.Logger LOGGER
Initialzation of logger. For us later in file.
-
URI_PROPERTY
public static final java.lang.String URI_PROPERTY
Variable for URI string- See Also:
- Constant Field Values
-
CREATED_AFTER_PROPERTY
public static final java.lang.String CREATED_AFTER_PROPERTY
Variable for createdAfter string- See Also:
- Constant Field Values
-
TRACKING_INDEX_PROPERTY
public static final java.lang.String TRACKING_INDEX_PROPERTY
Variable for trackingIndex string- See Also:
- Constant Field Values
-
TRACKING_FILE_NAME_PROPERTY
public static final java.lang.String TRACKING_FILE_NAME_PROPERTY
Variable for trackingFileName string- See Also:
- Constant Field Values
-
CONNECT_ATTEMPTS_PROPERTY
public static final java.lang.String CONNECT_ATTEMPTS_PROPERTY
Variable for connectAttempts string- See Also:
- Constant Field Values
-
CONNECT_TIMEOUT_PROPERTY
public static final java.lang.String CONNECT_TIMEOUT_PROPERTY
Variable for connectTimeout string- See Also:
- Constant Field Values
-
INITIAL_CATCHUP_AGE_PROPERTY
public static final java.lang.String INITIAL_CATCHUP_AGE_PROPERTY
Variable for initialCatchUpAge string- See Also:
- Constant Field Values
-
DEFAULT_TRACKING_FILE_NAME
public static final java.lang.String DEFAULT_TRACKING_FILE_NAME
Variable for tracking file. Links to data/AwsReceiver.json- See Also:
- Constant Field Values
-
DEFAULT_CONNECT_ATTEMPTS
public static final java.lang.String DEFAULT_CONNECT_ATTEMPTS
Variable for connect attempts. Set to 5- See Also:
- Constant Field Values
-
DEFAULT_CONNECT_TIMEOUT
public static final java.lang.String DEFAULT_CONNECT_TIMEOUT
Variable for timout. Set to 1000- See Also:
- Constant Field Values
-
DEFAULT_INITIAL_CATCHUP_AGE
public static final java.lang.String DEFAULT_INITIAL_CATCHUP_AGE
Variable for catchup age. Set to 7.0- See Also:
- Constant Field Values
-
createdAfter
protected java.time.Instant createdAfter
µs timestamp of last message that has been processed
-
initialCatchUpAge
protected double initialCatchUpAge
How far back to check when first connecting.
-
lastBroadcast
protected JsonNotification lastBroadcast
last broadcast message that has been processed (used for catch up)
-
lastBroadcastId
protected java.lang.Long lastBroadcastId
ID of the previously mentioned last broadcast
-
processBroadcast
protected boolean processBroadcast
whether to process broadcast messages (after catching up).
-
catchUpRunning
protected boolean catchUpRunning
whether currenting catching up.
-
catchUpSync
protected final java.lang.Object catchUpSync
sync object for catchUp state.
-
catchUpThread
protected java.lang.Thread catchUpThread
thread where catch up process runs.
-
catchUpThreadRunning
protected boolean catchUpThreadRunning
whether thread should continue running (shutdown flag)
-
lastCatchUpSent
protected java.time.Instant lastCatchUpSent
last catch up message sent (for response timeouts)
-
-
Method Detail
-
configure
public void configure(Config config) throws java.lang.Exception
Description copied from class:DefaultConfigurableProcess configuration settings. Called before startup().- Specified by:
configurein interfaceConfigurable- Overrides:
configurein classDefaultNotificationReceiver- Parameters:
config- the Config object with settings.- Throws:
java.lang.Exception- if configuration exceptions occur.
-
onOpen
public void onOpen(javax.websocket.Session session) throws java.io.IOExceptionCalled when connection is first opened. Start catch up process.- Specified by:
onOpenin interfaceWebSocketListener- Parameters:
session- Session to open- Throws:
java.io.IOException- IOException
-
onClose
public void onClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason)Called when connection is closed, either because shutdown on this end or closed by server.- Specified by:
onClosein interfaceWebSocketListener- Parameters:
session- Session to closecloseReason- Reason for closing session
-
onMessage
public void onMessage(java.lang.String message) throws java.io.IOExceptionMessage handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling. Synchronized to process messages in order, since onProductsCreatedAfter compares state of latest product to determine whether caught up and if broadcasts should be processed.- Specified by:
onMessagein interfaceWebSocketListener- Parameters:
message- Message notification - string- Throws:
java.io.IOException- IOException
-
onBroadcast
protected void onBroadcast(javax.json.JsonObject json) throws java.lang.ExceptionHandle a message with "action"="broadcast". If caught up process notification as usual, otherwise save notification to help detect when caught up.- Parameters:
json- JSON Message- Throws:
java.lang.Exception- Exception
-
onJsonNotification
protected void onJsonNotification(JsonNotification notification) throws java.lang.Exception
Process a received notification and update current "created" timestamp.- Parameters:
notification- JSON Notification- Throws:
java.lang.Exception- Exception
-
onProduct
protected void onProduct(javax.json.JsonObject json) throws java.lang.ExceptionHandle a message with "action"="product", which is received during catch up.- Parameters:
json- JSON Message- Throws:
java.lang.Exception- Exception
-
onProductsCreatedAfter
protected void onProductsCreatedAfter(javax.json.JsonObject json) throws java.lang.ExceptionHandle a message with "action"="products_created_after", which is received during catch up. Indicates the end of a response from a "products_created_after" request. Check whether caught up, and either switch to broadcast mode or continue catch up process.- Parameters:
json- JSON Message- Throws:
java.lang.Exception- Exception
-
run
public void run()
Catch up process. Do not run directly, usestartCatchUpThread()andstopCatchUpThread()to start and stop the process. Process waits untilstartCatchUp()is called, and usesDefaultNotificationReceiver.throttleQueues()between sends.- Specified by:
runin interfacejava.lang.Runnable
-
sendProductsCreatedAfter
protected void sendProductsCreatedAfter() throws java.io.IOExceptionSend an "action"="products_created_after" request, which is part of the catch up process. The server will reply with zero or more "action"="product" messages, and then one "action"="products_created_after" message to indicate the request is complete.- Throws:
java.io.IOException- IOException
-
startCatchUp
protected void startCatchUp()
Notify running background thread to start catch up process.
-
startCatchUpThread
protected void startCatchUpThread()
Start background thread for catch up process.
-
stopCatchUp
protected void stopCatchUp()
Notify running background thread to stop catch up process.
-
stopCatchUpThread
protected void stopCatchUpThread()
Stop background thread for catch up process.
-
onConnectFail
public void onConnectFail()
Description copied from interface:WebSocketListenerInterface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onConnectFailin interfaceWebSocketListener
-
onReconnectFail
public void onReconnectFail()
Description copied from interface:WebSocketListenerInterface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onReconnectFailin interfaceWebSocketListener
-
startup
public void startup() throws java.lang.ExceptionReads createdAfter from a tracking file if it exists, then connects to web socket.- Specified by:
startupin interfaceConfigurable- Overrides:
startupin classDefaultNotificationReceiver- Throws:
java.lang.Exception- Exception
-
shutdown
public void shutdown() throws java.lang.ExceptionCloses web socket- Specified by:
shutdownin interfaceConfigurable- Overrides:
shutdownin classDefaultNotificationReceiver- Throws:
java.lang.Exception- Exception
-
readTrackingData
public javax.json.JsonObject readTrackingData() throws java.lang.ExceptionReads tracking file.- Returns:
- JsonObject tracking file
- Throws:
java.lang.Exception- Exception
-
writeTrackingData
public void writeTrackingData() throws java.lang.ExceptionWrites tracking file.- Throws:
java.lang.Exception- Exception
-
getURI
public java.net.URI getURI()
Getter for URI- Returns:
- URI
-
setURI
public void setURI(java.net.URI uri)
Setter for URI- Parameters:
uri- URI
-
getTrackingFileName
public java.lang.String getTrackingFileName()
Getter for trackingFileName- Returns:
- name of tracking file
-
setTrackingFileName
public void setTrackingFileName(java.lang.String trackingFileName)
Setter for trackingFileName- Parameters:
trackingFileName- trackingFileName
-
getCreatedAfter
public java.time.Instant getCreatedAfter()
Getter for createdAfter- Returns:
- createdAfter
-
setCreatedAfter
public void setCreatedAfter(java.time.Instant createdAfter)
Setter for createdAfter- Parameters:
createdAfter- createdAfter
-
getAttempts
public int getAttempts()
Getter for attempts- Returns:
- attempts
-
setAttempts
public void setAttempts(int attempts)
Setter for attempts- Parameters:
attempts- attempts
-
getTimeout
public long getTimeout()
Getter for timeout- Returns:
- timeout
-
setTimeout
public void setTimeout(long timeout)
Setter for timeout- Parameters:
timeout- long timeout
-
-