WebSocketNotificationReceiver.java

package gov.usgs.earthquake.distribution;

import gov.usgs.util.Config;
import gov.usgs.util.FileUtils;
import gov.usgs.util.StreamUtils;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Receives notifications from an arbitrary web socket.
 */
public class WebSocketNotificationReceiver extends DefaultNotificationReceiver implements WebSocketListener {

  /** Logger for use in the file */
  public static final Logger LOGGER = Logger
          .getLogger(WebSocketNotificationReceiver.class.getName());

  /** Property for serverHost */
  public static final String SERVER_HOST_PROPERTY = "serverHost";
  /** Property for serverPort */
  public static final String SERVER_PORT_PROPERTY = "serverPort";
  /** Property for serverPath */
  public static final String SERVER_PATH_PROPERTY = "serverPath";
  /** Property for sequence */
  public static final String SEQUENCE_PROPERTY = "sequence";
  /** Property for timestamp */
  public static final String TIMESTAMP_PROPERTY = "timestamp";
  /** Property for trackingFileName */
  public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
  /** Property for connectAttempts */
  public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
  /** Property for connectTimeout */
  public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  /** Property for retryOnClose */
  public static final String RETRY_ON_CLOSE_PROPERTY = "retryOnClose";

  /** Default server host */
  public static final String DEFAULT_SERVER_HOST = "http://www.google.com";
  /** Default server port */
  public static final String DEFAULT_SERVER_PORT = "4222";
  /** Default server path */
  public static final String DEFAULT_SERVER_PATH = "/sequence/";
  /** Default tracking file */
  public static final String DEFAULT_TRACKING_FILE_NAME = "data/WebSocketReceiverInfo";
  /** Default number of connect attempts */
  public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
  /** Default timeout in ms */
  public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
  /** Default condiction for retry on close */
  public static final String DEFAULT_RETRY_ON_CLOSE = "true";
  /** attribute for data */
  public static final String ATTRIBUTE_DATA = "data";

  private String serverHost;
  private String serverPort;
  private String serverPath;
  private String trackingFileName;
  private int attempts;
  private long timeout;

  private WebSocketClient client;
  private String sequence = "0";


  @Override
  public void configure(Config config) throws Exception {
    super.configure(config);

    serverHost = config.getProperty(SERVER_HOST_PROPERTY, DEFAULT_SERVER_HOST);
    serverPort = config.getProperty(SERVER_PORT_PROPERTY, DEFAULT_SERVER_PORT);
    serverPath = config.getProperty(SERVER_PATH_PROPERTY, DEFAULT_SERVER_PATH);
    attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
    timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
    trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME);
  }

  /**
   * Reads a sequence from a tracking file if it exists. Otherwise, starting sequence is 0.
   * Connects to web socket
   * @throws Exception if error occurs
   */
  @Override
  public void startup() throws Exception{
    super.startup();

    //read sequence from tracking file if other parameters agree
    JsonObject json = readTrackingFile();
    if (json != null &&
            json.getString(SERVER_HOST_PROPERTY).equals(serverHost) &&
            json.getString(SERVER_PORT_PROPERTY).equals(serverPort) &&
            json.getString(SERVER_PATH_PROPERTY).equals(serverPath)) {
      sequence = json.getString(SEQUENCE_PROPERTY);
    }

    //open websocket
    client = new WebSocketClient(new URI(serverHost + ":" + serverPort + serverPath + sequence), this, attempts, timeout, true);
  }

  /**
   * Closes web socket
   * @throws Exception if error occurs
   */
  @Override
  public void shutdown() throws Exception{
    //close socket
    client.shutdown();
    super.shutdown();
  }

  /**
   * Writes tracking file to disc, storing latest sequence
   * @throws Exception if error occurs
   */
  public void writeTrackingFile() throws Exception {
    JsonObject json = Json.createObjectBuilder()
            .add(SERVER_HOST_PROPERTY,serverHost)
            .add(SERVER_PATH_PROPERTY,serverPath)
            .add(SERVER_PORT_PROPERTY,serverPort)
            .add(SEQUENCE_PROPERTY,sequence)
            .build();

    FileUtils.writeFileThenMove(
            new File(trackingFileName + "_tmp.json"),
            new File(trackingFileName + ".json"),
            json.toString().getBytes());
  }

  /**
   * Reads tracking file from disc
   * @return  JsonObject tracking file
   * @throws Exception if error occurs
   */
  public JsonObject readTrackingFile() throws Exception {
    JsonObject json = null;

    File trackingFile = new File(trackingFileName + ".json");
    if (trackingFile.exists()) {
      InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
      JsonReader jsonReader = Json.createReader(contents);
      json = jsonReader.readObject();
      jsonReader.close();
    }
    return json;
  }

  @Override
  public void onOpen(Session session) {
    // do nothing
  }

  /**
   * Message handler function passed to WebSocketClient
   * Parses the message as JSON, receives the contained URL notification, and writes the tracking file.
   * @param message String
   */
  @Override
  public void onMessage(String message) {
    JsonObject json;
    try (InputStream in = StreamUtils.getInputStream(message); JsonReader reader = Json.createReader(in)) {
      //parse input as json
      json = reader.readObject();
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception while receiving notification; is it encoded as JSON? ", e);
      return;
    }
    try {
      //convert to URLNotification and receive
      JsonObject dataJson = json.getJsonObject(ATTRIBUTE_DATA);
      URLNotification notification = URLNotificationJSONConverter.parseJSON(dataJson);
      receiveNotification(notification);

      //send heartbeat
      HeartbeatListener.sendHeartbeatMessage(getName(), "nats notification timestamp", json.getString(TIMESTAMP_PROPERTY));

      //write tracking file
      sequence = json.getJsonNumber(SEQUENCE_PROPERTY).toString();
      writeTrackingFile();
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing URLNotification ", e);
    }
  }

  @Override
  public void onClose(Session session, CloseReason closeReason) {
    // do nothing
  }

  @Override
  public void onConnectFail() {
    // do nothing
  }

  @Override
  public void onReconnectFail() {
    // do nothing
  }

  /** @return serverHost */
  public String getServerHost() {
    return serverHost;
  }

  /** @param serverHost to set */
  public void setServerHost(String serverHost) {
    this.serverHost = serverHost;
  }

  /** @return serverPort */
  public String getServerPort() {
    return serverPort;
  }

  /** @param serverPort to set */
  public void setServerPort(String serverPort) {
    this.serverPort = serverPort;
  }

  /** @return serverPath */
  public String getServerPath() {
    return serverPath;
  }

  /** @param serverPath to set */
  public void setServerPath(String serverPath) {
    this.serverPath = serverPath;
  }

  /** @return trackingFileName */
  public String getTrackingFileName() {
    return trackingFileName;
  }

  /** @param trackingFileName to set */
  public void setTrackingFileName(String trackingFileName) {
    this.trackingFileName = trackingFileName;
  }

  /** @return sequence */
  public String getSequence() {
    return sequence;
  }

  /** @param sequence to set */
  public void setSequence(String sequence) {
    this.sequence = sequence;
  }

  /** @return attempts */
  public int getAttempts() {
    return attempts;
  }

  /** @param attempts to set */
  public void setAttempts(int attempts) {
    this.attempts = attempts;
  }

  /** @return timeout */
  public long getTimeout() {
    return timeout;
  }

  /** @param timeout to set */
  public void setTimeout(long timeout) {
    this.timeout = timeout;
  }
}