NATSStreamingNotificationReceiver.java
package gov.usgs.earthquake.nats;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.URLNotification;
import gov.usgs.earthquake.distribution.URLNotificationJSONConverter;
import gov.usgs.util.Config;
import gov.usgs.util.FileUtils;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Connects directly to a NATS streaming server to receive notifications using a NATSClient
*/
public class NATSStreamingNotificationReceiver extends DefaultNotificationReceiver implements MessageHandler {
private static final Logger LOGGER = Logger
.getLogger(DefaultNotificationReceiver.class.getName());
/** Property for tracking file name */
public static String TRACKING_FILE_NAME_PROPERTY = "trackingFile";
/** Property on if update sequence should occur after exception */
public static String UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "updateSequenceAfterException";
/** Property for sequence */
public static String SEQUENCE_PROPERTY = "sequence";
/** Name of deafult tracking file */
public static String DEFAULT_TRACKING_FILE_NAME_PROPERTY = "data/STANReceiverInfo.json";
/** Default state of update after exception */
public static String DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "true";
private NATSClient client = new NATSClient();
private Subscription subscription;
private String subject;
private long sequence = 0;
private String trackingFileName;
private boolean updateSequenceAfterException;
private boolean exceptionThrown = false;
/**
* Configures receiver based on included properties
*
* @param config
* The user-defined configuration
*
* @throws Exception If required properties are ignored
*/
@Override
public void configure(Config config) throws Exception {
super.configure(config);
client.configure(config);
subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
if (subject == null) {
throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
}
trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME_PROPERTY);
updateSequenceAfterException = Boolean.parseBoolean(config.getProperty(
UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY,
DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY));
}
/**
* Does initial tracking file management and subscribes to server
* With a tracking file, gets the last sequence
*
* @throws InterruptedException if interrupted
* @throws IOException if IO error occurs
*/
@Override
public void startup() throws Exception {
super.startup();
//Start client
client.startup();
//Check properties if tracking file exists
JsonObject properties = readTrackingFile();
if (properties != null &&
properties.getString(NATSClient.SERVER_HOST_PROPERTY).equals(client.getServerHost()) &&
properties.getString(NATSClient.SERVER_PORT_PROPERTY).equals(client.getServerPort()) &&
properties.getString(NATSClient.CLUSTER_ID_PROPERTY).equals(client.getClusterId()) &&
properties.getString(NATSClient.CLIENT_ID_PROPERTY).equals(client.getClientId()) &&
properties.getString(NATSClient.SUBJECT_PROPERTY).equals(subject)) {
sequence = Long.parseLong(properties.get(SEQUENCE_PROPERTY).toString());
}
subscription = client.getConnection().subscribe(
subject,
this,
new SubscriptionOptions.Builder().startAtSequence(sequence).build());
// Always starts at stored sequence; initialized to 0 and overwritten by storage
}
/**
* Closes subscription/connection and writes state in tracking file
* Wraps each statement in a try/catch to ensure each step still happens
*
* @throws IOException if IO error occurs
* @throws InterruptedException if interrupted
* @throws TimeoutException if timeout
*/
@Override
public void shutdown() throws Exception {
try {
writeTrackingFile();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] failed to write to tracking file");
}
try {
subscription.unsubscribe();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] failed to unsubscribe from NATS channel");
}
subscription = null;
client.shutdown();
super.shutdown();
}
/**
* Writes pertinent configuration information to tracking file
* @throws Exception if error occurs
*/
public void writeTrackingFile() throws Exception {
JsonObject json = Json.createObjectBuilder()
.add(NATSClient.SERVER_HOST_PROPERTY,client.getServerHost())
.add(NATSClient.SERVER_PORT_PROPERTY,client.getServerPort())
.add(NATSClient.CLUSTER_ID_PROPERTY,client.getClusterId())
.add(NATSClient.CLIENT_ID_PROPERTY,client.getClientId())
.add(NATSClient.SUBJECT_PROPERTY,subject)
.add(SEQUENCE_PROPERTY,sequence)
.build();
FileUtils.writeFileThenMove(
new File(trackingFileName + "_tmp"),
new File(trackingFileName),
json.toString().getBytes());
}
/**
* Reads contents of tracking file
*
* @return JsonObject containing tracking file contents, or null if file doesn't exist
* @throws Exception if error occurs
*/
public JsonObject readTrackingFile() throws Exception {
JsonObject json = null;
File trackingFile = new File(trackingFileName);
if (trackingFile.exists()) {
InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
JsonReader jsonReader = Json.createReader(contents);
json = jsonReader.readObject();
jsonReader.close();
}
return json;
}
/**
* Defines behavior for message receipt. Attempts to process notifications, with configurable behavior
* for exception handling
*
* @param message
* The message received from the STAN server
*/
@Override
public void onMessage(Message message) {
try {
// parse message, send to listeners
URLNotification notification = URLNotificationJSONConverter.parseJSON(new ByteArrayInputStream(message.getData()));
receiveNotification(notification);
// update sequence and tracking file if exception not thrown or we still want to update sequence anyway
if (!exceptionThrown || updateSequenceAfterException) {
sequence = message.getSequence();
writeTrackingFile();
}
} catch (Exception e) {
exceptionThrown = true;
LOGGER.log(Level.WARNING,
"[" + getName() + "] exception handling NATSStreaming message." +
(!updateSequenceAfterException ? " Will no longer update sequence; restart PDL to reprocess.":"") +
" Stack Trace: " + e);
LOGGER.log(Level.FINE, "[" + getName() + "] Message: " + message.getData());
}
}
/** @return trackingFileName */
public String getTrackingFileName() {
return trackingFileName;
}
/** @param trackingFileName to set */
public void setTrackingFileName(String trackingFileName) {
this.trackingFileName = trackingFileName;
}
/** @return NATSClient */
public NATSClient getClient() {
return client;
}
/** @param client NATSClient to set */
public void setClient(NATSClient client) {
this.client = client;
}
/** @return subject */
public String getSubject() {
return subject;
}
/** @param subject to set */
public void setSubject(String subject) {
this.subject = subject;
}
}