Class AwsProductReceiver

  • All Implemented Interfaces:
    NotificationIndexCleanup.Listener, NotificationReceiver, WebSocketListener, Configurable, java.lang.Runnable

    public class AwsProductReceiver
    extends DefaultNotificationReceiver
    implements java.lang.Runnable, WebSocketListener
    Receives notifications from a PDL notification web socket. After initial connection, ignores broadcasts until catch up process is complete. Catch up involves sending a "products_created_after" request with the latest notification "created" timestamp, and processing products until either the last product matches the last broadcast or there are no more products after the latest notification "created" timestamp.
    • Field Detail

      • LOGGER

        public static final java.util.logging.Logger LOGGER
        Initialzation of logger. For us later in file.
      • URI_PROPERTY

        public static final java.lang.String URI_PROPERTY
        Variable for URI string
        See Also:
        Constant Field Values
      • CREATED_AFTER_PROPERTY

        public static final java.lang.String CREATED_AFTER_PROPERTY
        Variable for createdAfter string
        See Also:
        Constant Field Values
      • TRACKING_INDEX_PROPERTY

        public static final java.lang.String TRACKING_INDEX_PROPERTY
        Variable for trackingIndex string
        See Also:
        Constant Field Values
      • TRACKING_FILE_NAME_PROPERTY

        public static final java.lang.String TRACKING_FILE_NAME_PROPERTY
        Variable for trackingFileName string
        See Also:
        Constant Field Values
      • CONNECT_ATTEMPTS_PROPERTY

        public static final java.lang.String CONNECT_ATTEMPTS_PROPERTY
        Variable for connectAttempts string
        See Also:
        Constant Field Values
      • CONNECT_TIMEOUT_PROPERTY

        public static final java.lang.String CONNECT_TIMEOUT_PROPERTY
        Variable for connectTimeout string
        See Also:
        Constant Field Values
      • INITIAL_CATCHUP_AGE_PROPERTY

        public static final java.lang.String INITIAL_CATCHUP_AGE_PROPERTY
        Variable for initialCatchUpAge string
        See Also:
        Constant Field Values
      • DEFAULT_TRACKING_FILE_NAME

        public static final java.lang.String DEFAULT_TRACKING_FILE_NAME
        Variable for tracking file. Links to data/AwsReceiver.json
        See Also:
        Constant Field Values
      • DEFAULT_CONNECT_ATTEMPTS

        public static final java.lang.String DEFAULT_CONNECT_ATTEMPTS
        Variable for connect attempts. Set to 5
        See Also:
        Constant Field Values
      • DEFAULT_CONNECT_TIMEOUT

        public static final java.lang.String DEFAULT_CONNECT_TIMEOUT
        Variable for timout. Set to 1000
        See Also:
        Constant Field Values
      • DEFAULT_INITIAL_CATCHUP_AGE

        public static final java.lang.String DEFAULT_INITIAL_CATCHUP_AGE
        Variable for catchup age. Set to 7.0
        See Also:
        Constant Field Values
      • createdAfter

        protected java.time.Instant createdAfter
        µs timestamp of last message that has been processed
      • initialCatchUpAge

        protected double initialCatchUpAge
        How far back to check when first connecting.
      • lastBroadcast

        protected JsonNotification lastBroadcast
        last broadcast message that has been processed (used for catch up)
      • lastBroadcastId

        protected java.lang.Long lastBroadcastId
        ID of the previously mentioned last broadcast
      • processBroadcast

        protected boolean processBroadcast
        whether to process broadcast messages (after catching up).
      • catchUpRunning

        protected boolean catchUpRunning
        whether currenting catching up.
      • catchUpSync

        protected final java.lang.Object catchUpSync
        sync object for catchUp state.
      • catchUpThread

        protected java.lang.Thread catchUpThread
        thread where catch up process runs.
      • catchUpThreadRunning

        protected boolean catchUpThreadRunning
        whether thread should continue running (shutdown flag)
      • lastCatchUpSent

        protected java.time.Instant lastCatchUpSent
        last catch up message sent (for response timeouts)
    • Constructor Detail

      • AwsProductReceiver

        public AwsProductReceiver()
    • Method Detail

      • configure

        public void configure​(Config config)
                       throws java.lang.Exception
        Description copied from class: DefaultConfigurable
        Process configuration settings. Called before startup().
        Specified by:
        configure in interface Configurable
        Overrides:
        configure in class DefaultNotificationReceiver
        Parameters:
        config - the Config object with settings.
        Throws:
        java.lang.Exception - if configuration exceptions occur.
      • onOpen

        public void onOpen​(javax.websocket.Session session)
                    throws java.io.IOException
        Called when connection is first opened. Start catch up process.
        Specified by:
        onOpen in interface WebSocketListener
        Parameters:
        session - Session to open
        Throws:
        java.io.IOException - IOException
      • onClose

        public void onClose​(javax.websocket.Session session,
                            javax.websocket.CloseReason closeReason)
        Called when connection is closed, either because shutdown on this end or closed by server.
        Specified by:
        onClose in interface WebSocketListener
        Parameters:
        session - Session to close
        closeReason - Reason for closing session
      • onMessage

        public void onMessage​(java.lang.String message)
                       throws java.io.IOException
        Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling. Synchronized to process messages in order, since onProductsCreatedAfter compares state of latest product to determine whether caught up and if broadcasts should be processed.
        Specified by:
        onMessage in interface WebSocketListener
        Parameters:
        message - Message notification - string
        Throws:
        java.io.IOException - IOException
      • onBroadcast

        protected void onBroadcast​(javax.json.JsonObject json)
                            throws java.lang.Exception
        Handle a message with "action"="broadcast". If caught up process notification as usual, otherwise save notification to help detect when caught up.
        Parameters:
        json - JSON Message
        Throws:
        java.lang.Exception - Exception
      • onJsonNotification

        protected void onJsonNotification​(JsonNotification notification)
                                   throws java.lang.Exception
        Process a received notification and update current "created" timestamp.
        Parameters:
        notification - JSON Notification
        Throws:
        java.lang.Exception - Exception
      • onProduct

        protected void onProduct​(javax.json.JsonObject json)
                          throws java.lang.Exception
        Handle a message with "action"="product", which is received during catch up.
        Parameters:
        json - JSON Message
        Throws:
        java.lang.Exception - Exception
      • onProductsCreatedAfter

        protected void onProductsCreatedAfter​(javax.json.JsonObject json)
                                       throws java.lang.Exception
        Handle a message with "action"="products_created_after", which is received during catch up. Indicates the end of a response from a "products_created_after" request. Check whether caught up, and either switch to broadcast mode or continue catch up process.
        Parameters:
        json - JSON Message
        Throws:
        java.lang.Exception - Exception
      • sendProductsCreatedAfter

        protected void sendProductsCreatedAfter()
                                         throws java.io.IOException
        Send an "action"="products_created_after" request, which is part of the catch up process. The server will reply with zero or more "action"="product" messages, and then one "action"="products_created_after" message to indicate the request is complete.
        Throws:
        java.io.IOException - IOException
      • startCatchUp

        protected void startCatchUp()
        Notify running background thread to start catch up process.
      • startCatchUpThread

        protected void startCatchUpThread()
        Start background thread for catch up process.
      • stopCatchUp

        protected void stopCatchUp()
        Notify running background thread to stop catch up process.
      • stopCatchUpThread

        protected void stopCatchUpThread()
        Stop background thread for catch up process.
      • onConnectFail

        public void onConnectFail()
        Description copied from interface: WebSocketListener
        Interface method to be overriden by WebSocket files and AwsProductReceiver
        Specified by:
        onConnectFail in interface WebSocketListener
      • startup

        public void startup()
                     throws java.lang.Exception
        Reads createdAfter from a tracking file if it exists, then connects to web socket.
        Specified by:
        startup in interface Configurable
        Overrides:
        startup in class DefaultNotificationReceiver
        Throws:
        java.lang.Exception - Exception
      • readTrackingData

        public javax.json.JsonObject readTrackingData()
                                               throws java.lang.Exception
        Reads tracking file.
        Returns:
        JsonObject tracking file
        Throws:
        java.lang.Exception - Exception
      • writeTrackingData

        public void writeTrackingData()
                               throws java.lang.Exception
        Writes tracking file.
        Throws:
        java.lang.Exception - Exception
      • getURI

        public java.net.URI getURI()
        Getter for URI
        Returns:
        URI
      • setURI

        public void setURI​(java.net.URI uri)
        Setter for URI
        Parameters:
        uri - URI
      • getTrackingFileName

        public java.lang.String getTrackingFileName()
        Getter for trackingFileName
        Returns:
        name of tracking file
      • setTrackingFileName

        public void setTrackingFileName​(java.lang.String trackingFileName)
        Setter for trackingFileName
        Parameters:
        trackingFileName - trackingFileName
      • getCreatedAfter

        public java.time.Instant getCreatedAfter()
        Getter for createdAfter
        Returns:
        createdAfter
      • setCreatedAfter

        public void setCreatedAfter​(java.time.Instant createdAfter)
        Setter for createdAfter
        Parameters:
        createdAfter - createdAfter
      • getAttempts

        public int getAttempts()
        Getter for attempts
        Returns:
        attempts
      • setAttempts

        public void setAttempts​(int attempts)
        Setter for attempts
        Parameters:
        attempts - attempts
      • getTimeout

        public long getTimeout()
        Getter for timeout
        Returns:
        timeout
      • setTimeout

        public void setTimeout​(long timeout)
        Setter for timeout
        Parameters:
        timeout - long timeout