NatsStreamingConnectionTest.java

  1. package gov.usgs.earthquake.nats;

  2. import io.nats.streaming.*;

  3. import java.util.concurrent.CountDownLatch;

  4. public class NatsStreamingConnectionTest {

  5.   /**
  6.    * Tests the connection of NATS streaming server
  7.    * Code lifted from https://github.com/nats-io/stan.java
  8.    *
  9.    * @param args
  10.    *          Input arguments
  11.    *
  12.    * @throws Exception When something goes wrong
  13.    */
  14.   public static void main(String[] args) throws Exception{
  15.     // Connection factory
  16.     // Produces streaming connections
  17.     // Note: Should not make multiple factories; this should really only be done once
  18.     StreamingConnectionFactory factory = new StreamingConnectionFactory("test-cluster","test-client");

  19.     // Connection
  20.     // It looks like we shouldn't be producing a bunch of NATS connections; instead, we pass them in to new streaming connections
  21.     // Look into this in the future
  22.     StreamingConnection connection = factory.createConnection(); //Exception just tossed out of main

  23.     System.out.println("NATS Streaming Connection Test\n");

  24.     // Publish blocks until we get a response from the server
  25.     // In the future, we might want to use some more intelligent logic (with a time out)
  26.     // Basically, if we can't hit the server, it's down and that's not good
  27.     System.out.println("Trying to publish message...");
  28.     try {
  29.       connection.publish("test-subject", "test-data".getBytes());
  30.       System.out.println("Message published");
  31.     } catch (Exception e) {
  32.       System.err.println("Unable to store message in NATS Streaming. Exception:");
  33.       throw e;
  34.     }

  35.     // Creating countdown latch to wait for subscriber message receipt
  36.     // This lets this thread wait until an operation in other threads finishes
  37.     final CountDownLatch doneSignal = new CountDownLatch(1);

  38.     // Asynchronous subscriber receiving messages
  39.     // Evidently listens in another thread (that is what asynchronous means)
  40.     Subscription subscription = connection.subscribe("test-subject", new MessageHandler() {
  41.       public void onMessage(Message m) {
  42.         System.out.printf("Received a message: %s\n", new String(m.getData()));
  43.         doneSignal.countDown();
  44.       }
  45.     }, new SubscriptionOptions.Builder().deliverAllAvailable().build());

  46.     // Wait for countdown latch
  47.     doneSignal.await();

  48.     // Unsubscribe for cleanup (is this necessary?)
  49.     subscription.unsubscribe();

  50.     // Close connection
  51.     connection.close();

  52.   }
  53. }