NATSStreamingNotificationReceiver.java
- package gov.usgs.earthquake.nats;
- import gov.usgs.earthquake.distribution.ConfigurationException;
- import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
- import gov.usgs.earthquake.distribution.URLNotification;
- import gov.usgs.earthquake.distribution.URLNotificationJSONConverter;
- import gov.usgs.util.Config;
- import gov.usgs.util.FileUtils;
- import io.nats.streaming.Message;
- import io.nats.streaming.MessageHandler;
- import io.nats.streaming.Subscription;
- import io.nats.streaming.SubscriptionOptions;
- import javax.json.Json;
- import javax.json.JsonObject;
- import javax.json.JsonReader;
- import java.io.ByteArrayInputStream;
- import java.io.File;
- import java.io.InputStream;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Connects directly to a NATS streaming server to receive notifications using a NATSClient
- */
- public class NATSStreamingNotificationReceiver extends DefaultNotificationReceiver implements MessageHandler {
- private static final Logger LOGGER = Logger
- .getLogger(DefaultNotificationReceiver.class.getName());
- /** Property for tracking file name */
- public static String TRACKING_FILE_NAME_PROPERTY = "trackingFile";
- /** Property on if update sequence should occur after exception */
- public static String UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "updateSequenceAfterException";
- /** Property for sequence */
- public static String SEQUENCE_PROPERTY = "sequence";
- /** Name of deafult tracking file */
- public static String DEFAULT_TRACKING_FILE_NAME_PROPERTY = "data/STANReceiverInfo.json";
- /** Default state of update after exception */
- public static String DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "true";
- private NATSClient client = new NATSClient();
- private Subscription subscription;
- private String subject;
- private long sequence = 0;
- private String trackingFileName;
- private boolean updateSequenceAfterException;
- private boolean exceptionThrown = false;
- /**
- * Configures receiver based on included properties
- *
- * @param config
- * The user-defined configuration
- *
- * @throws Exception If required properties are ignored
- */
- @Override
- public void configure(Config config) throws Exception {
- super.configure(config);
- client.configure(config);
- subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
- if (subject == null) {
- throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
- }
- trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME_PROPERTY);
- updateSequenceAfterException = Boolean.parseBoolean(config.getProperty(
- UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY,
- DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY));
- }
- /**
- * Does initial tracking file management and subscribes to server
- * With a tracking file, gets the last sequence
- *
- * @throws InterruptedException if interrupted
- * @throws IOException if IO error occurs
- */
- @Override
- public void startup() throws Exception {
- super.startup();
- //Start client
- client.startup();
- //Check properties if tracking file exists
- JsonObject properties = readTrackingFile();
- if (properties != null &&
- properties.getString(NATSClient.SERVER_HOST_PROPERTY).equals(client.getServerHost()) &&
- properties.getString(NATSClient.SERVER_PORT_PROPERTY).equals(client.getServerPort()) &&
- properties.getString(NATSClient.CLUSTER_ID_PROPERTY).equals(client.getClusterId()) &&
- properties.getString(NATSClient.CLIENT_ID_PROPERTY).equals(client.getClientId()) &&
- properties.getString(NATSClient.SUBJECT_PROPERTY).equals(subject)) {
- sequence = Long.parseLong(properties.get(SEQUENCE_PROPERTY).toString());
- }
- subscription = client.getConnection().subscribe(
- subject,
- this,
- new SubscriptionOptions.Builder().startAtSequence(sequence).build());
- // Always starts at stored sequence; initialized to 0 and overwritten by storage
- }
- /**
- * Closes subscription/connection and writes state in tracking file
- * Wraps each statement in a try/catch to ensure each step still happens
- *
- * @throws IOException if IO error occurs
- * @throws InterruptedException if interrupted
- * @throws TimeoutException if timeout
- */
- @Override
- public void shutdown() throws Exception {
- try {
- writeTrackingFile();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName() + "] failed to write to tracking file");
- }
- try {
- subscription.unsubscribe();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName() + "] failed to unsubscribe from NATS channel");
- }
- subscription = null;
- client.shutdown();
- super.shutdown();
- }
- /**
- * Writes pertinent configuration information to tracking file
- * @throws Exception if error occurs
- */
- public void writeTrackingFile() throws Exception {
- JsonObject json = Json.createObjectBuilder()
- .add(NATSClient.SERVER_HOST_PROPERTY,client.getServerHost())
- .add(NATSClient.SERVER_PORT_PROPERTY,client.getServerPort())
- .add(NATSClient.CLUSTER_ID_PROPERTY,client.getClusterId())
- .add(NATSClient.CLIENT_ID_PROPERTY,client.getClientId())
- .add(NATSClient.SUBJECT_PROPERTY,subject)
- .add(SEQUENCE_PROPERTY,sequence)
- .build();
- FileUtils.writeFileThenMove(
- new File(trackingFileName + "_tmp"),
- new File(trackingFileName),
- json.toString().getBytes());
- }
- /**
- * Reads contents of tracking file
- *
- * @return JsonObject containing tracking file contents, or null if file doesn't exist
- * @throws Exception if error occurs
- */
- public JsonObject readTrackingFile() throws Exception {
- JsonObject json = null;
- File trackingFile = new File(trackingFileName);
- if (trackingFile.exists()) {
- InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
- JsonReader jsonReader = Json.createReader(contents);
- json = jsonReader.readObject();
- jsonReader.close();
- }
- return json;
- }
- /**
- * Defines behavior for message receipt. Attempts to process notifications, with configurable behavior
- * for exception handling
- *
- * @param message
- * The message received from the STAN server
- */
- @Override
- public void onMessage(Message message) {
- try {
- // parse message, send to listeners
- URLNotification notification = URLNotificationJSONConverter.parseJSON(new ByteArrayInputStream(message.getData()));
- receiveNotification(notification);
- // update sequence and tracking file if exception not thrown or we still want to update sequence anyway
- if (!exceptionThrown || updateSequenceAfterException) {
- sequence = message.getSequence();
- writeTrackingFile();
- }
- } catch (Exception e) {
- exceptionThrown = true;
- LOGGER.log(Level.WARNING,
- "[" + getName() + "] exception handling NATSStreaming message." +
- (!updateSequenceAfterException ? " Will no longer update sequence; restart PDL to reprocess.":"") +
- " Stack Trace: " + e);
- LOGGER.log(Level.FINE, "[" + getName() + "] Message: " + message.getData());
- }
- }
- /** @return trackingFileName */
- public String getTrackingFileName() {
- return trackingFileName;
- }
- /** @param trackingFileName to set */
- public void setTrackingFileName(String trackingFileName) {
- this.trackingFileName = trackingFileName;
- }
- /** @return NATSClient */
- public NATSClient getClient() {
- return client;
- }
- /** @param client NATSClient to set */
- public void setClient(NATSClient client) {
- this.client = client;
- }
- /** @return subject */
- public String getSubject() {
- return subject;
- }
- /** @param subject to set */
- public void setSubject(String subject) {
- this.subject = subject;
- }
- }