NATSClient.java

  1. package gov.usgs.earthquake.nats;

  2. import gov.usgs.earthquake.distribution.ConfigurationException;
  3. import gov.usgs.util.Config;
  4. import gov.usgs.util.Configurable;
  5. import io.nats.streaming.StreamingConnection;
  6. import io.nats.streaming.StreamingConnectionFactory;

  7. import java.net.InetAddress;
  8. import java.net.NetworkInterface;
  9. import java.security.MessageDigest;
  10. import java.util.Base64;
  11. import java.util.logging.Level;
  12. import java.util.logging.Logger;

  13. /**
  14.  * Manages conserved NATS Streaming connection properties.
  15.  * Written so several concurrent connections can be created.
  16.  */
  17. public class NATSClient implements Configurable {

  18.   /** Logger object */
  19.   public static Logger LOGGER  = Logger
  20.           .getLogger(NATSClient.class.getName());

  21.   /** Property for server host */
  22.   public static String SERVER_HOST_PROPERTY = "serverHost";
  23.   /** Property for server port */
  24.   public static String SERVER_PORT_PROPERTY = "serverPort";
  25.   /** Property for cluster id */
  26.   public static String CLUSTER_ID_PROPERTY = "clusterId";
  27.   /** Property for client id */
  28.   public static String CLIENT_ID_PROPERTY = "clientId";
  29.   /** Property for subject */
  30.   public static String SUBJECT_PROPERTY = "subject";

  31.   private String serverHost;
  32.   private String serverPort;
  33.   private String clusterId;
  34.   private String clientId;
  35.   private String clientIdSuffix;

  36.   private StreamingConnection connection;

  37.   /** Constructor for testing*/
  38.   public NATSClient() {
  39.     this("localhost","4222","test-cluster",Long.toString(Thread.currentThread().getId()));
  40.   }

  41.   /**
  42.    * Custom constructor
  43.    * @param serverHost String of host
  44.    * @param serverPort String of port
  45.    * @param clusterId String of clusterID
  46.    * @param clientIdSuffix String of idSuffix
  47.    */
  48.   public NATSClient(String serverHost, String serverPort, String clusterId, String clientIdSuffix) {
  49.     // try to generate a unique ID; use suffix only if fail
  50.     this.serverHost = serverHost;
  51.     this.serverPort = serverPort;
  52.     this.clusterId = clusterId;
  53.     this.clientIdSuffix = clientIdSuffix;
  54.   }

  55.   /**
  56.    * Configures the required and optional parameters to connect to NATS Streaming server
  57.    *
  58.    * @param config
  59.    *            the Config to load.
  60.    * @throws Exception if error occurs
  61.    */
  62.   @Override
  63.   public void configure(Config config) throws Exception {
  64.     // required parameters
  65.     serverHost = config.getProperty(SERVER_HOST_PROPERTY);
  66.     if (serverHost == null) {
  67.       throw new ConfigurationException(SERVER_HOST_PROPERTY + " is a required parameter");
  68.     }

  69.     serverPort = config.getProperty(SERVER_PORT_PROPERTY);
  70.     if (serverHost == null) {
  71.       throw new ConfigurationException(SERVER_PORT_PROPERTY + " is a required parameter");
  72.     }

  73.     clusterId = config.getProperty(CLUSTER_ID_PROPERTY);
  74.     if (serverHost == null) {
  75.       throw new ConfigurationException(CLUSTER_ID_PROPERTY + " is a required parameter");
  76.     }

  77.     clientId = config.getProperty(CLIENT_ID_PROPERTY);
  78.   }

  79.   /**
  80.    * Starts connection to NATS streaming server
  81.    * @throws Exception If something goes wrong when connecting to NATS streaming server
  82.    */
  83.   @Override
  84.   public void startup() throws Exception {
  85.     // generate client ID if we don't have one
  86.     if (clientId == null) {
  87.       clientId = generateClientId(clientIdSuffix);
  88.     }

  89.     // create connection
  90.     StreamingConnectionFactory factory = new StreamingConnectionFactory(clusterId,clientId);
  91.     factory.setNatsUrl("nats://" + serverHost + ":" + serverPort);
  92.     connection = factory.createConnection();
  93.   }

  94.   /**
  95.    * Safely closes connection to NATS Streaming server
  96.    */
  97.   @Override
  98.   public void shutdown() {
  99.     try {
  100.       connection.close();
  101.     } catch (Exception e) {
  102.       LOGGER.log(Level.WARNING, "[" + getName() + "] failed to close NATS connection");
  103.     }
  104.     connection = null;
  105.   }

  106.   /**
  107.    * Creates a client ID based on the host IP and MAC address
  108.    *
  109.    * @param suffix
  110.    *    Suffix to add to generated ID
  111.    * @return clientId
  112.    * @throws Exception if there's an issue accessing IP or MAC addresses, or can't do sha1 hash
  113.    */
  114.   private static String generateClientId(String suffix) throws Exception{
  115.     // get mac address
  116.     InetAddress host = InetAddress.getLocalHost();
  117.     NetworkInterface net = NetworkInterface.getByInetAddress(host);
  118.     byte[] macRaw =  net.getHardwareAddress();

  119.     // do a sha1 hash
  120.     MessageDigest digest = MessageDigest.getInstance("SHA-1");
  121.     digest.reset();
  122.     digest.update(macRaw);
  123.     String sha1 = Base64.getEncoder().encodeToString(digest.digest());

  124.     // create client id
  125.     String clientId = host.getHostAddress().replace('.','-') + '_' + sha1+ '_' + suffix;

  126.     return clientId;
  127.   }

  128.   @Override
  129.   public String getName() {
  130.     return NATSClient.class.getName();
  131.   }

  132.   @Override
  133.   public void setName(String string) {

  134.   }

  135.   /** @return serverHost */
  136.   public String getServerHost() {
  137.     return serverHost;
  138.   }

  139.   /** @param serverHost to set */
  140.   public void setServerHost(String serverHost) {
  141.     this.serverHost = serverHost;
  142.   }

  143.   /** @return serverPort */
  144.   public String getServerPort() {
  145.     return serverPort;
  146.   }

  147.   /** @param serverPort to set */
  148.   public void setServerPort(String serverPort) {
  149.     this.serverPort = serverPort;
  150.   }

  151.   /** @return clusterID */
  152.   public String getClusterId() {
  153.     return clusterId;
  154.   }

  155.   /** @param clusterId to set */
  156.   public void setClusterId(String clusterId) {
  157.     this.clusterId = clusterId;
  158.   }

  159.   /** @return clientID */
  160.   public String getClientId() {
  161.     return clientId;
  162.   }

  163.   /** @param clientId to set */
  164.   public void setClientId(String clientId) {
  165.     this.clientId = clientId;
  166.   }

  167.   /** @return StreamingConnection */
  168.   public StreamingConnection getConnection() {
  169.     return connection;
  170.   }

  171.   /** @param connection StreamingConnection to set */
  172.   public void setConnection(StreamingConnection connection) {
  173.     this.connection = connection;
  174.   }
  175. }