NATSClient.java
package gov.usgs.earthquake.nats;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.util.Config;
import gov.usgs.util.Configurable;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.security.MessageDigest;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Manages conserved NATS Streaming connection properties.
* Written so several concurrent connections can be created.
*/
public class NATSClient implements Configurable {
/** Logger object */
public static Logger LOGGER = Logger
.getLogger(NATSClient.class.getName());
/** Property for server host */
public static String SERVER_HOST_PROPERTY = "serverHost";
/** Property for server port */
public static String SERVER_PORT_PROPERTY = "serverPort";
/** Property for cluster id */
public static String CLUSTER_ID_PROPERTY = "clusterId";
/** Property for client id */
public static String CLIENT_ID_PROPERTY = "clientId";
/** Property for subject */
public static String SUBJECT_PROPERTY = "subject";
private String serverHost;
private String serverPort;
private String clusterId;
private String clientId;
private String clientIdSuffix;
private StreamingConnection connection;
/** Constructor for testing*/
public NATSClient() {
this("localhost","4222","test-cluster",Long.toString(Thread.currentThread().getId()));
}
/**
* Custom constructor
* @param serverHost String of host
* @param serverPort String of port
* @param clusterId String of clusterID
* @param clientIdSuffix String of idSuffix
*/
public NATSClient(String serverHost, String serverPort, String clusterId, String clientIdSuffix) {
// try to generate a unique ID; use suffix only if fail
this.serverHost = serverHost;
this.serverPort = serverPort;
this.clusterId = clusterId;
this.clientIdSuffix = clientIdSuffix;
}
/**
* Configures the required and optional parameters to connect to NATS Streaming server
*
* @param config
* the Config to load.
* @throws Exception if error occurs
*/
@Override
public void configure(Config config) throws Exception {
// required parameters
serverHost = config.getProperty(SERVER_HOST_PROPERTY);
if (serverHost == null) {
throw new ConfigurationException(SERVER_HOST_PROPERTY + " is a required parameter");
}
serverPort = config.getProperty(SERVER_PORT_PROPERTY);
if (serverHost == null) {
throw new ConfigurationException(SERVER_PORT_PROPERTY + " is a required parameter");
}
clusterId = config.getProperty(CLUSTER_ID_PROPERTY);
if (serverHost == null) {
throw new ConfigurationException(CLUSTER_ID_PROPERTY + " is a required parameter");
}
clientId = config.getProperty(CLIENT_ID_PROPERTY);
}
/**
* Starts connection to NATS streaming server
* @throws Exception If something goes wrong when connecting to NATS streaming server
*/
@Override
public void startup() throws Exception {
// generate client ID if we don't have one
if (clientId == null) {
clientId = generateClientId(clientIdSuffix);
}
// create connection
StreamingConnectionFactory factory = new StreamingConnectionFactory(clusterId,clientId);
factory.setNatsUrl("nats://" + serverHost + ":" + serverPort);
connection = factory.createConnection();
}
/**
* Safely closes connection to NATS Streaming server
*/
@Override
public void shutdown() {
try {
connection.close();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] failed to close NATS connection");
}
connection = null;
}
/**
* Creates a client ID based on the host IP and MAC address
*
* @param suffix
* Suffix to add to generated ID
* @return clientId
* @throws Exception if there's an issue accessing IP or MAC addresses, or can't do sha1 hash
*/
private static String generateClientId(String suffix) throws Exception{
// get mac address
InetAddress host = InetAddress.getLocalHost();
NetworkInterface net = NetworkInterface.getByInetAddress(host);
byte[] macRaw = net.getHardwareAddress();
// do a sha1 hash
MessageDigest digest = MessageDigest.getInstance("SHA-1");
digest.reset();
digest.update(macRaw);
String sha1 = Base64.getEncoder().encodeToString(digest.digest());
// create client id
String clientId = host.getHostAddress().replace('.','-') + '_' + sha1+ '_' + suffix;
return clientId;
}
@Override
public String getName() {
return NATSClient.class.getName();
}
@Override
public void setName(String string) {
}
/** @return serverHost */
public String getServerHost() {
return serverHost;
}
/** @param serverHost to set */
public void setServerHost(String serverHost) {
this.serverHost = serverHost;
}
/** @return serverPort */
public String getServerPort() {
return serverPort;
}
/** @param serverPort to set */
public void setServerPort(String serverPort) {
this.serverPort = serverPort;
}
/** @return clusterID */
public String getClusterId() {
return clusterId;
}
/** @param clusterId to set */
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
/** @return clientID */
public String getClientId() {
return clientId;
}
/** @param clientId to set */
public void setClientId(String clientId) {
this.clientId = clientId;
}
/** @return StreamingConnection */
public StreamingConnection getConnection() {
return connection;
}
/** @param connection StreamingConnection to set */
public void setConnection(StreamingConnection connection) {
this.connection = connection;
}
}