NATSStreamingNotificationReceiver.java

package gov.usgs.earthquake.nats;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.URLNotification;
import gov.usgs.earthquake.distribution.URLNotificationJSONConverter;
import gov.usgs.util.Config;
import gov.usgs.util.FileUtils;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Connects directly to a NATS streaming server to receive notifications using a NATSClient
 */
public class NATSStreamingNotificationReceiver extends DefaultNotificationReceiver implements MessageHandler {

  private static final Logger LOGGER = Logger
          .getLogger(DefaultNotificationReceiver.class.getName());

  /** Property for tracking file name */
  public static String TRACKING_FILE_NAME_PROPERTY = "trackingFile";
  /** Property on if update sequence should occur after exception */
  public static String UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "updateSequenceAfterException";
  /** Property for sequence */
  public static String SEQUENCE_PROPERTY = "sequence";

  /** Name of deafult tracking file */
  public static String DEFAULT_TRACKING_FILE_NAME_PROPERTY = "data/STANReceiverInfo.json";
  /** Default state of update after exception */
  public static String DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "true";

  private NATSClient client = new NATSClient();
  private Subscription subscription;

  private String subject;
  private long sequence = 0;
  private String trackingFileName;
  private boolean updateSequenceAfterException;
  private boolean exceptionThrown = false;

  /**
   * Configures receiver based on included properties
   *
   * @param config
   *            The user-defined configuration
   *
   * @throws Exception If required properties are ignored
   */
  @Override
  public void configure(Config config) throws Exception {
    super.configure(config);
    client.configure(config);

    subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
    if (subject == null) {
      throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
    }

    trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME_PROPERTY);
    updateSequenceAfterException = Boolean.parseBoolean(config.getProperty(
      UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY,
      DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY));
  }

  /**
   * Does initial tracking file management and subscribes to server
   * With a tracking file, gets the last sequence
   *
   * @throws InterruptedException if interrupted
   * @throws IOException if IO error occurs
   */
  @Override
  public void startup() throws Exception {
    super.startup();

    //Start client
    client.startup();

    //Check properties if tracking file exists
    JsonObject properties = readTrackingFile();
    if (properties != null &&
        properties.getString(NATSClient.SERVER_HOST_PROPERTY).equals(client.getServerHost()) &&
        properties.getString(NATSClient.SERVER_PORT_PROPERTY).equals(client.getServerPort()) &&
        properties.getString(NATSClient.CLUSTER_ID_PROPERTY).equals(client.getClusterId()) &&
        properties.getString(NATSClient.CLIENT_ID_PROPERTY).equals(client.getClientId()) &&
        properties.getString(NATSClient.SUBJECT_PROPERTY).equals(subject)) {
      sequence = Long.parseLong(properties.get(SEQUENCE_PROPERTY).toString());
    }

    subscription = client.getConnection().subscribe(
      subject,
      this,
      new SubscriptionOptions.Builder().startAtSequence(sequence).build());
    // Always starts at stored sequence; initialized to 0 and overwritten by storage
  }

  /**
   * Closes subscription/connection and writes state in tracking file
   * Wraps each statement in a try/catch to ensure each step still happens
   *
   * @throws IOException if IO error occurs
   * @throws InterruptedException if interrupted
   * @throws TimeoutException if timeout
   */
  @Override
  public void shutdown() throws Exception {
    try {
      writeTrackingFile();
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] failed to write to tracking file");
    }
    try {
      subscription.unsubscribe();
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] failed to unsubscribe from NATS channel");
    }
    subscription = null;
    client.shutdown();
    super.shutdown();
  }

  /**
   * Writes pertinent configuration information to tracking file
   * @throws Exception if error occurs
   */
  public void writeTrackingFile() throws Exception {
    JsonObject json = Json.createObjectBuilder()
      .add(NATSClient.SERVER_HOST_PROPERTY,client.getServerHost())
      .add(NATSClient.SERVER_PORT_PROPERTY,client.getServerPort())
      .add(NATSClient.CLUSTER_ID_PROPERTY,client.getClusterId())
      .add(NATSClient.CLIENT_ID_PROPERTY,client.getClientId())
      .add(NATSClient.SUBJECT_PROPERTY,subject)
      .add(SEQUENCE_PROPERTY,sequence)
    .build();

    FileUtils.writeFileThenMove(
      new File(trackingFileName + "_tmp"),
      new File(trackingFileName),
      json.toString().getBytes());
  }

  /**
   * Reads contents of tracking file
   *
   * @return JsonObject containing tracking file contents, or null if file doesn't exist
   * @throws Exception if error occurs
   */
  public JsonObject readTrackingFile() throws Exception {
    JsonObject json = null;

    File trackingFile = new File(trackingFileName);
    if (trackingFile.exists()) {
      InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
      JsonReader jsonReader = Json.createReader(contents);
      json = jsonReader.readObject();
      jsonReader.close();
    }
    return json;
  }

  /**
   * Defines behavior for message receipt. Attempts to process notifications, with configurable behavior
   * for exception handling
   *
   * @param message
   *            The message received from the STAN server
   */
  @Override
  public void onMessage(Message message) {
    try {
      // parse message, send to listeners
      URLNotification notification = URLNotificationJSONConverter.parseJSON(new ByteArrayInputStream(message.getData()));
      receiveNotification(notification);
      // update sequence and tracking file if exception not thrown or we still want to update sequence anyway
      if (!exceptionThrown || updateSequenceAfterException) {
        sequence = message.getSequence();
        writeTrackingFile();
      }
    } catch (Exception e) {
      exceptionThrown = true;
      LOGGER.log(Level.WARNING,
        "[" + getName() + "] exception handling NATSStreaming message." +
        (!updateSequenceAfterException ? " Will no longer update sequence; restart PDL to reprocess.":"") +
        " Stack Trace: " + e);
      LOGGER.log(Level.FINE, "[" + getName() + "] Message: " + message.getData());
    }
  }

  /** @return trackingFileName */
  public String getTrackingFileName() {
    return trackingFileName;
  }

  /** @param trackingFileName to set */
  public void setTrackingFileName(String trackingFileName) {
    this.trackingFileName = trackingFileName;
  }

  /** @return NATSClient */
  public NATSClient getClient() {
    return client;
  }

  /** @param client NATSClient to set */
  public void setClient(NATSClient client) {
    this.client = client;
  }

  /** @return subject */
  public String getSubject() {
    return subject;
  }

  /** @param subject to set */
  public void setSubject(String subject) {
    this.subject = subject;
  }

}