WebSocketNotificationReceiver.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.util.Config;
  3. import gov.usgs.util.FileUtils;
  4. import gov.usgs.util.StreamUtils;

  5. import javax.json.Json;
  6. import javax.json.JsonObject;
  7. import javax.json.JsonReader;
  8. import javax.websocket.CloseReason;
  9. import javax.websocket.Session;
  10. import java.io.ByteArrayInputStream;
  11. import java.io.File;
  12. import java.io.InputStream;
  13. import java.net.URI;
  14. import java.util.logging.Level;
  15. import java.util.logging.Logger;

  16. /**
  17.  * Receives notifications from an arbitrary web socket.
  18.  */
  19. public class WebSocketNotificationReceiver extends DefaultNotificationReceiver implements WebSocketListener {

  20.   /** Logger for use in the file */
  21.   public static final Logger LOGGER = Logger
  22.           .getLogger(WebSocketNotificationReceiver.class.getName());

  23.   /** Property for serverHost */
  24.   public static final String SERVER_HOST_PROPERTY = "serverHost";
  25.   /** Property for serverPort */
  26.   public static final String SERVER_PORT_PROPERTY = "serverPort";
  27.   /** Property for serverPath */
  28.   public static final String SERVER_PATH_PROPERTY = "serverPath";
  29.   /** Property for sequence */
  30.   public static final String SEQUENCE_PROPERTY = "sequence";
  31.   /** Property for timestamp */
  32.   public static final String TIMESTAMP_PROPERTY = "timestamp";
  33.   /** Property for trackingFileName */
  34.   public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
  35.   /** Property for connectAttempts */
  36.   public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
  37.   /** Property for connectTimeout */
  38.   public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  39.   /** Property for retryOnClose */
  40.   public static final String RETRY_ON_CLOSE_PROPERTY = "retryOnClose";

  41.   /** Default server host */
  42.   public static final String DEFAULT_SERVER_HOST = "http://www.google.com";
  43.   /** Default server port */
  44.   public static final String DEFAULT_SERVER_PORT = "4222";
  45.   /** Default server path */
  46.   public static final String DEFAULT_SERVER_PATH = "/sequence/";
  47.   /** Default tracking file */
  48.   public static final String DEFAULT_TRACKING_FILE_NAME = "data/WebSocketReceiverInfo";
  49.   /** Default number of connect attempts */
  50.   public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
  51.   /** Default timeout in ms */
  52.   public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
  53.   /** Default condiction for retry on close */
  54.   public static final String DEFAULT_RETRY_ON_CLOSE = "true";
  55.   /** attribute for data */
  56.   public static final String ATTRIBUTE_DATA = "data";

  57.   private String serverHost;
  58.   private String serverPort;
  59.   private String serverPath;
  60.   private String trackingFileName;
  61.   private int attempts;
  62.   private long timeout;

  63.   private WebSocketClient client;
  64.   private String sequence = "0";


  65.   @Override
  66.   public void configure(Config config) throws Exception {
  67.     super.configure(config);

  68.     serverHost = config.getProperty(SERVER_HOST_PROPERTY, DEFAULT_SERVER_HOST);
  69.     serverPort = config.getProperty(SERVER_PORT_PROPERTY, DEFAULT_SERVER_PORT);
  70.     serverPath = config.getProperty(SERVER_PATH_PROPERTY, DEFAULT_SERVER_PATH);
  71.     attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
  72.     timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
  73.     trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME);
  74.   }

  75.   /**
  76.    * Reads a sequence from a tracking file if it exists. Otherwise, starting sequence is 0.
  77.    * Connects to web socket
  78.    * @throws Exception if error occurs
  79.    */
  80.   @Override
  81.   public void startup() throws Exception{
  82.     super.startup();

  83.     //read sequence from tracking file if other parameters agree
  84.     JsonObject json = readTrackingFile();
  85.     if (json != null &&
  86.             json.getString(SERVER_HOST_PROPERTY).equals(serverHost) &&
  87.             json.getString(SERVER_PORT_PROPERTY).equals(serverPort) &&
  88.             json.getString(SERVER_PATH_PROPERTY).equals(serverPath)) {
  89.       sequence = json.getString(SEQUENCE_PROPERTY);
  90.     }

  91.     //open websocket
  92.     client = new WebSocketClient(new URI(serverHost + ":" + serverPort + serverPath + sequence), this, attempts, timeout, true);
  93.   }

  94.   /**
  95.    * Closes web socket
  96.    * @throws Exception if error occurs
  97.    */
  98.   @Override
  99.   public void shutdown() throws Exception{
  100.     //close socket
  101.     client.shutdown();
  102.     super.shutdown();
  103.   }

  104.   /**
  105.    * Writes tracking file to disc, storing latest sequence
  106.    * @throws Exception if error occurs
  107.    */
  108.   public void writeTrackingFile() throws Exception {
  109.     JsonObject json = Json.createObjectBuilder()
  110.             .add(SERVER_HOST_PROPERTY,serverHost)
  111.             .add(SERVER_PATH_PROPERTY,serverPath)
  112.             .add(SERVER_PORT_PROPERTY,serverPort)
  113.             .add(SEQUENCE_PROPERTY,sequence)
  114.             .build();

  115.     FileUtils.writeFileThenMove(
  116.             new File(trackingFileName + "_tmp.json"),
  117.             new File(trackingFileName + ".json"),
  118.             json.toString().getBytes());
  119.   }

  120.   /**
  121.    * Reads tracking file from disc
  122.    * @return  JsonObject tracking file
  123.    * @throws Exception if error occurs
  124.    */
  125.   public JsonObject readTrackingFile() throws Exception {
  126.     JsonObject json = null;

  127.     File trackingFile = new File(trackingFileName + ".json");
  128.     if (trackingFile.exists()) {
  129.       InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
  130.       JsonReader jsonReader = Json.createReader(contents);
  131.       json = jsonReader.readObject();
  132.       jsonReader.close();
  133.     }
  134.     return json;
  135.   }

  136.   @Override
  137.   public void onOpen(Session session) {
  138.     // do nothing
  139.   }

  140.   /**
  141.    * Message handler function passed to WebSocketClient
  142.    * Parses the message as JSON, receives the contained URL notification, and writes the tracking file.
  143.    * @param message String
  144.    */
  145.   @Override
  146.   public void onMessage(String message) {
  147.     JsonObject json;
  148.     try (InputStream in = StreamUtils.getInputStream(message); JsonReader reader = Json.createReader(in)) {
  149.       //parse input as json
  150.       json = reader.readObject();
  151.     } catch (Exception e) {
  152.       LOGGER.log(Level.WARNING, "[" + getName() + "] exception while receiving notification; is it encoded as JSON? ", e);
  153.       return;
  154.     }
  155.     try {
  156.       //convert to URLNotification and receive
  157.       JsonObject dataJson = json.getJsonObject(ATTRIBUTE_DATA);
  158.       URLNotification notification = URLNotificationJSONConverter.parseJSON(dataJson);
  159.       receiveNotification(notification);

  160.       //send heartbeat
  161.       HeartbeatListener.sendHeartbeatMessage(getName(), "nats notification timestamp", json.getString(TIMESTAMP_PROPERTY));

  162.       //write tracking file
  163.       sequence = json.getJsonNumber(SEQUENCE_PROPERTY).toString();
  164.       writeTrackingFile();
  165.     } catch (Exception e) {
  166.       LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing URLNotification ", e);
  167.     }
  168.   }

  169.   @Override
  170.   public void onClose(Session session, CloseReason closeReason) {
  171.     // do nothing
  172.   }

  173.   @Override
  174.   public void onConnectFail() {
  175.     // do nothing
  176.   }

  177.   @Override
  178.   public void onReconnectFail() {
  179.     // do nothing
  180.   }

  181.   /** @return serverHost */
  182.   public String getServerHost() {
  183.     return serverHost;
  184.   }

  185.   /** @param serverHost to set */
  186.   public void setServerHost(String serverHost) {
  187.     this.serverHost = serverHost;
  188.   }

  189.   /** @return serverPort */
  190.   public String getServerPort() {
  191.     return serverPort;
  192.   }

  193.   /** @param serverPort to set */
  194.   public void setServerPort(String serverPort) {
  195.     this.serverPort = serverPort;
  196.   }

  197.   /** @return serverPath */
  198.   public String getServerPath() {
  199.     return serverPath;
  200.   }

  201.   /** @param serverPath to set */
  202.   public void setServerPath(String serverPath) {
  203.     this.serverPath = serverPath;
  204.   }

  205.   /** @return trackingFileName */
  206.   public String getTrackingFileName() {
  207.     return trackingFileName;
  208.   }

  209.   /** @param trackingFileName to set */
  210.   public void setTrackingFileName(String trackingFileName) {
  211.     this.trackingFileName = trackingFileName;
  212.   }

  213.   /** @return sequence */
  214.   public String getSequence() {
  215.     return sequence;
  216.   }

  217.   /** @param sequence to set */
  218.   public void setSequence(String sequence) {
  219.     this.sequence = sequence;
  220.   }

  221.   /** @return attempts */
  222.   public int getAttempts() {
  223.     return attempts;
  224.   }

  225.   /** @param attempts to set */
  226.   public void setAttempts(int attempts) {
  227.     this.attempts = attempts;
  228.   }

  229.   /** @return timeout */
  230.   public long getTimeout() {
  231.     return timeout;
  232.   }

  233.   /** @param timeout to set */
  234.   public void setTimeout(long timeout) {
  235.     this.timeout = timeout;
  236.   }
  237. }