NatsStreamingConnectionTest.java
package gov.usgs.earthquake.nats;
import io.nats.streaming.*;
import java.util.concurrent.CountDownLatch;
public class NatsStreamingConnectionTest {
/**
* Tests the connection of NATS streaming server
* Code lifted from https://github.com/nats-io/stan.java
*
* @param args
* Input arguments
*
* @throws Exception When something goes wrong
*/
public static void main(String[] args) throws Exception{
// Connection factory
// Produces streaming connections
// Note: Should not make multiple factories; this should really only be done once
StreamingConnectionFactory factory = new StreamingConnectionFactory("test-cluster","test-client");
// Connection
// It looks like we shouldn't be producing a bunch of NATS connections; instead, we pass them in to new streaming connections
// Look into this in the future
StreamingConnection connection = factory.createConnection(); //Exception just tossed out of main
System.out.println("NATS Streaming Connection Test\n");
// Publish blocks until we get a response from the server
// In the future, we might want to use some more intelligent logic (with a time out)
// Basically, if we can't hit the server, it's down and that's not good
System.out.println("Trying to publish message...");
try {
connection.publish("test-subject", "test-data".getBytes());
System.out.println("Message published");
} catch (Exception e) {
System.err.println("Unable to store message in NATS Streaming. Exception:");
throw e;
}
// Creating countdown latch to wait for subscriber message receipt
// This lets this thread wait until an operation in other threads finishes
final CountDownLatch doneSignal = new CountDownLatch(1);
// Asynchronous subscriber receiving messages
// Evidently listens in another thread (that is what asynchronous means)
Subscription subscription = connection.subscribe("test-subject", new MessageHandler() {
public void onMessage(Message m) {
System.out.printf("Received a message: %s\n", new String(m.getData()));
doneSignal.countDown();
}
}, new SubscriptionOptions.Builder().deliverAllAvailable().build());
// Wait for countdown latch
doneSignal.await();
// Unsubscribe for cleanup (is this necessary?)
subscription.unsubscribe();
// Close connection
connection.close();
}
}