NATSClient.java

package gov.usgs.earthquake.nats;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.util.Config;
import gov.usgs.util.Configurable;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.security.MessageDigest;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Manages conserved NATS Streaming connection properties.
 * Written so several concurrent connections can be created.
 */
public class NATSClient implements Configurable {

  /** Logger object */
  public static Logger LOGGER  = Logger
          .getLogger(NATSClient.class.getName());

  /** Property for server host */
  public static String SERVER_HOST_PROPERTY = "serverHost";
  /** Property for server port */
  public static String SERVER_PORT_PROPERTY = "serverPort";
  /** Property for cluster id */
  public static String CLUSTER_ID_PROPERTY = "clusterId";
  /** Property for client id */
  public static String CLIENT_ID_PROPERTY = "clientId";
  /** Property for subject */
  public static String SUBJECT_PROPERTY = "subject";

  private String serverHost;
  private String serverPort;
  private String clusterId;
  private String clientId;
  private String clientIdSuffix;

  private StreamingConnection connection;

  /** Constructor for testing*/
  public NATSClient() {
    this("localhost","4222","test-cluster",Long.toString(Thread.currentThread().getId()));
  }

  /**
   * Custom constructor
   * @param serverHost String of host
   * @param serverPort String of port
   * @param clusterId String of clusterID
   * @param clientIdSuffix String of idSuffix
   */
  public NATSClient(String serverHost, String serverPort, String clusterId, String clientIdSuffix) {
    // try to generate a unique ID; use suffix only if fail
    this.serverHost = serverHost;
    this.serverPort = serverPort;
    this.clusterId = clusterId;
    this.clientIdSuffix = clientIdSuffix;
  }

  /**
   * Configures the required and optional parameters to connect to NATS Streaming server
   *
   * @param config
   *            the Config to load.
   * @throws Exception if error occurs
   */
  @Override
  public void configure(Config config) throws Exception {
    // required parameters
    serverHost = config.getProperty(SERVER_HOST_PROPERTY);
    if (serverHost == null) {
      throw new ConfigurationException(SERVER_HOST_PROPERTY + " is a required parameter");
    }

    serverPort = config.getProperty(SERVER_PORT_PROPERTY);
    if (serverHost == null) {
      throw new ConfigurationException(SERVER_PORT_PROPERTY + " is a required parameter");
    }

    clusterId = config.getProperty(CLUSTER_ID_PROPERTY);
    if (serverHost == null) {
      throw new ConfigurationException(CLUSTER_ID_PROPERTY + " is a required parameter");
    }

    clientId = config.getProperty(CLIENT_ID_PROPERTY);
  }

  /**
   * Starts connection to NATS streaming server
   * @throws Exception If something goes wrong when connecting to NATS streaming server
   */
  @Override
  public void startup() throws Exception {
    // generate client ID if we don't have one
    if (clientId == null) {
      clientId = generateClientId(clientIdSuffix);
    }

    // create connection
    StreamingConnectionFactory factory = new StreamingConnectionFactory(clusterId,clientId);
    factory.setNatsUrl("nats://" + serverHost + ":" + serverPort);
    connection = factory.createConnection();
  }

  /**
   * Safely closes connection to NATS Streaming server
   */
  @Override
  public void shutdown() {
    try {
      connection.close();
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] failed to close NATS connection");
    }
    connection = null;
  }

  /**
   * Creates a client ID based on the host IP and MAC address
   *
   * @param suffix
   *    Suffix to add to generated ID
   * @return clientId
   * @throws Exception if there's an issue accessing IP or MAC addresses, or can't do sha1 hash
   */
  private static String generateClientId(String suffix) throws Exception{
    // get mac address
    InetAddress host = InetAddress.getLocalHost();
    NetworkInterface net = NetworkInterface.getByInetAddress(host);
    byte[] macRaw =  net.getHardwareAddress();

    // do a sha1 hash
    MessageDigest digest = MessageDigest.getInstance("SHA-1");
    digest.reset();
    digest.update(macRaw);
    String sha1 = Base64.getEncoder().encodeToString(digest.digest());

    // create client id
    String clientId = host.getHostAddress().replace('.','-') + '_' + sha1+ '_' + suffix;

    return clientId;
  }

  @Override
  public String getName() {
    return NATSClient.class.getName();
  }

  @Override
  public void setName(String string) {

  }

  /** @return serverHost */
  public String getServerHost() {
    return serverHost;
  }

  /** @param serverHost to set */
  public void setServerHost(String serverHost) {
    this.serverHost = serverHost;
  }

  /** @return serverPort */
  public String getServerPort() {
    return serverPort;
  }

  /** @param serverPort to set */
  public void setServerPort(String serverPort) {
    this.serverPort = serverPort;
  }

  /** @return clusterID */
  public String getClusterId() {
    return clusterId;
  }

  /** @param clusterId to set */
  public void setClusterId(String clusterId) {
    this.clusterId = clusterId;
  }

  /** @return clientID */
  public String getClientId() {
    return clientId;
  }

  /** @param clientId to set */
  public void setClientId(String clientId) {
    this.clientId = clientId;
  }

  /** @return StreamingConnection */
  public StreamingConnection getConnection() {
    return connection;
  }

  /** @param connection StreamingConnection to set */
  public void setConnection(StreamingConnection connection) {
    this.connection = connection;
  }
}