NATSStreamingNotificationReceiver.java

  1. package gov.usgs.earthquake.nats;

  2. import gov.usgs.earthquake.distribution.ConfigurationException;
  3. import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
  4. import gov.usgs.earthquake.distribution.URLNotification;
  5. import gov.usgs.earthquake.distribution.URLNotificationJSONConverter;
  6. import gov.usgs.util.Config;
  7. import gov.usgs.util.FileUtils;
  8. import io.nats.streaming.Message;
  9. import io.nats.streaming.MessageHandler;
  10. import io.nats.streaming.Subscription;
  11. import io.nats.streaming.SubscriptionOptions;

  12. import javax.json.Json;
  13. import javax.json.JsonObject;
  14. import javax.json.JsonReader;
  15. import java.io.ByteArrayInputStream;
  16. import java.io.File;
  17. import java.io.InputStream;
  18. import java.io.IOException;
  19. import java.util.concurrent.TimeoutException;
  20. import java.util.logging.Level;
  21. import java.util.logging.Logger;

  22. /**
  23.  * Connects directly to a NATS streaming server to receive notifications using a NATSClient
  24.  */
  25. public class NATSStreamingNotificationReceiver extends DefaultNotificationReceiver implements MessageHandler {

  26.   private static final Logger LOGGER = Logger
  27.           .getLogger(DefaultNotificationReceiver.class.getName());

  28.   /** Property for tracking file name */
  29.   public static String TRACKING_FILE_NAME_PROPERTY = "trackingFile";
  30.   /** Property on if update sequence should occur after exception */
  31.   public static String UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "updateSequenceAfterException";
  32.   /** Property for sequence */
  33.   public static String SEQUENCE_PROPERTY = "sequence";

  34.   /** Name of deafult tracking file */
  35.   public static String DEFAULT_TRACKING_FILE_NAME_PROPERTY = "data/STANReceiverInfo.json";
  36.   /** Default state of update after exception */
  37.   public static String DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "true";

  38.   private NATSClient client = new NATSClient();
  39.   private Subscription subscription;

  40.   private String subject;
  41.   private long sequence = 0;
  42.   private String trackingFileName;
  43.   private boolean updateSequenceAfterException;
  44.   private boolean exceptionThrown = false;

  45.   /**
  46.    * Configures receiver based on included properties
  47.    *
  48.    * @param config
  49.    *            The user-defined configuration
  50.    *
  51.    * @throws Exception If required properties are ignored
  52.    */
  53.   @Override
  54.   public void configure(Config config) throws Exception {
  55.     super.configure(config);
  56.     client.configure(config);

  57.     subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
  58.     if (subject == null) {
  59.       throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
  60.     }

  61.     trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME_PROPERTY);
  62.     updateSequenceAfterException = Boolean.parseBoolean(config.getProperty(
  63.       UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY,
  64.       DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY));
  65.   }

  66.   /**
  67.    * Does initial tracking file management and subscribes to server
  68.    * With a tracking file, gets the last sequence
  69.    *
  70.    * @throws InterruptedException if interrupted
  71.    * @throws IOException if IO error occurs
  72.    */
  73.   @Override
  74.   public void startup() throws Exception {
  75.     super.startup();

  76.     //Start client
  77.     client.startup();

  78.     //Check properties if tracking file exists
  79.     JsonObject properties = readTrackingFile();
  80.     if (properties != null &&
  81.         properties.getString(NATSClient.SERVER_HOST_PROPERTY).equals(client.getServerHost()) &&
  82.         properties.getString(NATSClient.SERVER_PORT_PROPERTY).equals(client.getServerPort()) &&
  83.         properties.getString(NATSClient.CLUSTER_ID_PROPERTY).equals(client.getClusterId()) &&
  84.         properties.getString(NATSClient.CLIENT_ID_PROPERTY).equals(client.getClientId()) &&
  85.         properties.getString(NATSClient.SUBJECT_PROPERTY).equals(subject)) {
  86.       sequence = Long.parseLong(properties.get(SEQUENCE_PROPERTY).toString());
  87.     }

  88.     subscription = client.getConnection().subscribe(
  89.       subject,
  90.       this,
  91.       new SubscriptionOptions.Builder().startAtSequence(sequence).build());
  92.     // Always starts at stored sequence; initialized to 0 and overwritten by storage
  93.   }

  94.   /**
  95.    * Closes subscription/connection and writes state in tracking file
  96.    * Wraps each statement in a try/catch to ensure each step still happens
  97.    *
  98.    * @throws IOException if IO error occurs
  99.    * @throws InterruptedException if interrupted
  100.    * @throws TimeoutException if timeout
  101.    */
  102.   @Override
  103.   public void shutdown() throws Exception {
  104.     try {
  105.       writeTrackingFile();
  106.     } catch (Exception e) {
  107.       LOGGER.log(Level.WARNING, "[" + getName() + "] failed to write to tracking file");
  108.     }
  109.     try {
  110.       subscription.unsubscribe();
  111.     } catch (Exception e) {
  112.       LOGGER.log(Level.WARNING, "[" + getName() + "] failed to unsubscribe from NATS channel");
  113.     }
  114.     subscription = null;
  115.     client.shutdown();
  116.     super.shutdown();
  117.   }

  118.   /**
  119.    * Writes pertinent configuration information to tracking file
  120.    * @throws Exception if error occurs
  121.    */
  122.   public void writeTrackingFile() throws Exception {
  123.     JsonObject json = Json.createObjectBuilder()
  124.       .add(NATSClient.SERVER_HOST_PROPERTY,client.getServerHost())
  125.       .add(NATSClient.SERVER_PORT_PROPERTY,client.getServerPort())
  126.       .add(NATSClient.CLUSTER_ID_PROPERTY,client.getClusterId())
  127.       .add(NATSClient.CLIENT_ID_PROPERTY,client.getClientId())
  128.       .add(NATSClient.SUBJECT_PROPERTY,subject)
  129.       .add(SEQUENCE_PROPERTY,sequence)
  130.     .build();

  131.     FileUtils.writeFileThenMove(
  132.       new File(trackingFileName + "_tmp"),
  133.       new File(trackingFileName),
  134.       json.toString().getBytes());
  135.   }

  136.   /**
  137.    * Reads contents of tracking file
  138.    *
  139.    * @return JsonObject containing tracking file contents, or null if file doesn't exist
  140.    * @throws Exception if error occurs
  141.    */
  142.   public JsonObject readTrackingFile() throws Exception {
  143.     JsonObject json = null;

  144.     File trackingFile = new File(trackingFileName);
  145.     if (trackingFile.exists()) {
  146.       InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
  147.       JsonReader jsonReader = Json.createReader(contents);
  148.       json = jsonReader.readObject();
  149.       jsonReader.close();
  150.     }
  151.     return json;
  152.   }

  153.   /**
  154.    * Defines behavior for message receipt. Attempts to process notifications, with configurable behavior
  155.    * for exception handling
  156.    *
  157.    * @param message
  158.    *            The message received from the STAN server
  159.    */
  160.   @Override
  161.   public void onMessage(Message message) {
  162.     try {
  163.       // parse message, send to listeners
  164.       URLNotification notification = URLNotificationJSONConverter.parseJSON(new ByteArrayInputStream(message.getData()));
  165.       receiveNotification(notification);
  166.       // update sequence and tracking file if exception not thrown or we still want to update sequence anyway
  167.       if (!exceptionThrown || updateSequenceAfterException) {
  168.         sequence = message.getSequence();
  169.         writeTrackingFile();
  170.       }
  171.     } catch (Exception e) {
  172.       exceptionThrown = true;
  173.       LOGGER.log(Level.WARNING,
  174.         "[" + getName() + "] exception handling NATSStreaming message." +
  175.         (!updateSequenceAfterException ? " Will no longer update sequence; restart PDL to reprocess.":"") +
  176.         " Stack Trace: " + e);
  177.       LOGGER.log(Level.FINE, "[" + getName() + "] Message: " + message.getData());
  178.     }
  179.   }

  180.   /** @return trackingFileName */
  181.   public String getTrackingFileName() {
  182.     return trackingFileName;
  183.   }

  184.   /** @param trackingFileName to set */
  185.   public void setTrackingFileName(String trackingFileName) {
  186.     this.trackingFileName = trackingFileName;
  187.   }

  188.   /** @return NATSClient */
  189.   public NATSClient getClient() {
  190.     return client;
  191.   }

  192.   /** @param client NATSClient to set */
  193.   public void setClient(NATSClient client) {
  194.     this.client = client;
  195.   }

  196.   /** @return subject */
  197.   public String getSubject() {
  198.     return subject;
  199.   }

  200.   /** @param subject to set */
  201.   public void setSubject(String subject) {
  202.     this.subject = subject;
  203.   }

  204. }