NatsStreamingConnectionTest.java
- package gov.usgs.earthquake.nats;
- import io.nats.streaming.*;
- import java.util.concurrent.CountDownLatch;
- public class NatsStreamingConnectionTest {
- /**
- * Tests the connection of NATS streaming server
- * Code lifted from https://github.com/nats-io/stan.java
- *
- * @param args
- * Input arguments
- *
- * @throws Exception When something goes wrong
- */
- public static void main(String[] args) throws Exception{
- // Connection factory
- // Produces streaming connections
- // Note: Should not make multiple factories; this should really only be done once
- StreamingConnectionFactory factory = new StreamingConnectionFactory("test-cluster","test-client");
- // Connection
- // It looks like we shouldn't be producing a bunch of NATS connections; instead, we pass them in to new streaming connections
- // Look into this in the future
- StreamingConnection connection = factory.createConnection(); //Exception just tossed out of main
- System.out.println("NATS Streaming Connection Test\n");
- // Publish blocks until we get a response from the server
- // In the future, we might want to use some more intelligent logic (with a time out)
- // Basically, if we can't hit the server, it's down and that's not good
- System.out.println("Trying to publish message...");
- try {
- connection.publish("test-subject", "test-data".getBytes());
- System.out.println("Message published");
- } catch (Exception e) {
- System.err.println("Unable to store message in NATS Streaming. Exception:");
- throw e;
- }
- // Creating countdown latch to wait for subscriber message receipt
- // This lets this thread wait until an operation in other threads finishes
- final CountDownLatch doneSignal = new CountDownLatch(1);
- // Asynchronous subscriber receiving messages
- // Evidently listens in another thread (that is what asynchronous means)
- Subscription subscription = connection.subscribe("test-subject", new MessageHandler() {
- public void onMessage(Message m) {
- System.out.printf("Received a message: %s\n", new String(m.getData()));
- doneSignal.countDown();
- }
- }, new SubscriptionOptions.Builder().deliverAllAvailable().build());
- // Wait for countdown latch
- doneSignal.await();
- // Unsubscribe for cleanup (is this necessary?)
- subscription.unsubscribe();
- // Close connection
- connection.close();
- }
- }