WebSocketNotificationReceiver.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.util.Config;
- import gov.usgs.util.FileUtils;
- import gov.usgs.util.StreamUtils;
- import javax.json.Json;
- import javax.json.JsonObject;
- import javax.json.JsonReader;
- import javax.websocket.CloseReason;
- import javax.websocket.Session;
- import java.io.ByteArrayInputStream;
- import java.io.File;
- import java.io.InputStream;
- import java.net.URI;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Receives notifications from an arbitrary web socket.
- */
- public class WebSocketNotificationReceiver extends DefaultNotificationReceiver implements WebSocketListener {
- /** Logger for use in the file */
- public static final Logger LOGGER = Logger
- .getLogger(WebSocketNotificationReceiver.class.getName());
- /** Property for serverHost */
- public static final String SERVER_HOST_PROPERTY = "serverHost";
- /** Property for serverPort */
- public static final String SERVER_PORT_PROPERTY = "serverPort";
- /** Property for serverPath */
- public static final String SERVER_PATH_PROPERTY = "serverPath";
- /** Property for sequence */
- public static final String SEQUENCE_PROPERTY = "sequence";
- /** Property for timestamp */
- public static final String TIMESTAMP_PROPERTY = "timestamp";
- /** Property for trackingFileName */
- public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
- /** Property for connectAttempts */
- public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
- /** Property for connectTimeout */
- public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
- /** Property for retryOnClose */
- public static final String RETRY_ON_CLOSE_PROPERTY = "retryOnClose";
- /** Default server host */
- public static final String DEFAULT_SERVER_HOST = "http://www.google.com";
- /** Default server port */
- public static final String DEFAULT_SERVER_PORT = "4222";
- /** Default server path */
- public static final String DEFAULT_SERVER_PATH = "/sequence/";
- /** Default tracking file */
- public static final String DEFAULT_TRACKING_FILE_NAME = "data/WebSocketReceiverInfo";
- /** Default number of connect attempts */
- public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
- /** Default timeout in ms */
- public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
- /** Default condiction for retry on close */
- public static final String DEFAULT_RETRY_ON_CLOSE = "true";
- /** attribute for data */
- public static final String ATTRIBUTE_DATA = "data";
- private String serverHost;
- private String serverPort;
- private String serverPath;
- private String trackingFileName;
- private int attempts;
- private long timeout;
- private WebSocketClient client;
- private String sequence = "0";
- @Override
- public void configure(Config config) throws Exception {
- super.configure(config);
- serverHost = config.getProperty(SERVER_HOST_PROPERTY, DEFAULT_SERVER_HOST);
- serverPort = config.getProperty(SERVER_PORT_PROPERTY, DEFAULT_SERVER_PORT);
- serverPath = config.getProperty(SERVER_PATH_PROPERTY, DEFAULT_SERVER_PATH);
- attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
- timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
- trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME);
- }
- /**
- * Reads a sequence from a tracking file if it exists. Otherwise, starting sequence is 0.
- * Connects to web socket
- * @throws Exception if error occurs
- */
- @Override
- public void startup() throws Exception{
- super.startup();
- //read sequence from tracking file if other parameters agree
- JsonObject json = readTrackingFile();
- if (json != null &&
- json.getString(SERVER_HOST_PROPERTY).equals(serverHost) &&
- json.getString(SERVER_PORT_PROPERTY).equals(serverPort) &&
- json.getString(SERVER_PATH_PROPERTY).equals(serverPath)) {
- sequence = json.getString(SEQUENCE_PROPERTY);
- }
- //open websocket
- client = new WebSocketClient(new URI(serverHost + ":" + serverPort + serverPath + sequence), this, attempts, timeout, true);
- }
- /**
- * Closes web socket
- * @throws Exception if error occurs
- */
- @Override
- public void shutdown() throws Exception{
- //close socket
- client.shutdown();
- super.shutdown();
- }
- /**
- * Writes tracking file to disc, storing latest sequence
- * @throws Exception if error occurs
- */
- public void writeTrackingFile() throws Exception {
- JsonObject json = Json.createObjectBuilder()
- .add(SERVER_HOST_PROPERTY,serverHost)
- .add(SERVER_PATH_PROPERTY,serverPath)
- .add(SERVER_PORT_PROPERTY,serverPort)
- .add(SEQUENCE_PROPERTY,sequence)
- .build();
- FileUtils.writeFileThenMove(
- new File(trackingFileName + "_tmp.json"),
- new File(trackingFileName + ".json"),
- json.toString().getBytes());
- }
- /**
- * Reads tracking file from disc
- * @return JsonObject tracking file
- * @throws Exception if error occurs
- */
- public JsonObject readTrackingFile() throws Exception {
- JsonObject json = null;
- File trackingFile = new File(trackingFileName + ".json");
- if (trackingFile.exists()) {
- InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
- JsonReader jsonReader = Json.createReader(contents);
- json = jsonReader.readObject();
- jsonReader.close();
- }
- return json;
- }
- @Override
- public void onOpen(Session session) {
- // do nothing
- }
- /**
- * Message handler function passed to WebSocketClient
- * Parses the message as JSON, receives the contained URL notification, and writes the tracking file.
- * @param message String
- */
- @Override
- public void onMessage(String message) {
- JsonObject json;
- try (InputStream in = StreamUtils.getInputStream(message); JsonReader reader = Json.createReader(in)) {
- //parse input as json
- json = reader.readObject();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName() + "] exception while receiving notification; is it encoded as JSON? ", e);
- return;
- }
- try {
- //convert to URLNotification and receive
- JsonObject dataJson = json.getJsonObject(ATTRIBUTE_DATA);
- URLNotification notification = URLNotificationJSONConverter.parseJSON(dataJson);
- receiveNotification(notification);
- //send heartbeat
- HeartbeatListener.sendHeartbeatMessage(getName(), "nats notification timestamp", json.getString(TIMESTAMP_PROPERTY));
- //write tracking file
- sequence = json.getJsonNumber(SEQUENCE_PROPERTY).toString();
- writeTrackingFile();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing URLNotification ", e);
- }
- }
- @Override
- public void onClose(Session session, CloseReason closeReason) {
- // do nothing
- }
- @Override
- public void onConnectFail() {
- // do nothing
- }
- @Override
- public void onReconnectFail() {
- // do nothing
- }
- /** @return serverHost */
- public String getServerHost() {
- return serverHost;
- }
- /** @param serverHost to set */
- public void setServerHost(String serverHost) {
- this.serverHost = serverHost;
- }
- /** @return serverPort */
- public String getServerPort() {
- return serverPort;
- }
- /** @param serverPort to set */
- public void setServerPort(String serverPort) {
- this.serverPort = serverPort;
- }
- /** @return serverPath */
- public String getServerPath() {
- return serverPath;
- }
- /** @param serverPath to set */
- public void setServerPath(String serverPath) {
- this.serverPath = serverPath;
- }
- /** @return trackingFileName */
- public String getTrackingFileName() {
- return trackingFileName;
- }
- /** @param trackingFileName to set */
- public void setTrackingFileName(String trackingFileName) {
- this.trackingFileName = trackingFileName;
- }
- /** @return sequence */
- public String getSequence() {
- return sequence;
- }
- /** @param sequence to set */
- public void setSequence(String sequence) {
- this.sequence = sequence;
- }
- /** @return attempts */
- public int getAttempts() {
- return attempts;
- }
- /** @param attempts to set */
- public void setAttempts(int attempts) {
- this.attempts = attempts;
- }
- /** @return timeout */
- public long getTimeout() {
- return timeout;
- }
- /** @param timeout to set */
- public void setTimeout(long timeout) {
- this.timeout = timeout;
- }
- }