NATSStreamingNotificationSender.java

package gov.usgs.earthquake.nats;

import gov.usgs.earthquake.distribution.*;
import gov.usgs.util.Config;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Sends notifications directly to NATS streaming server using a NATS client
 */
public class NATSStreamingNotificationSender extends DefaultNotificationSender {

  private static final Logger LOGGER = Logger
          .getLogger(DefaultNotificationSender.class.getName());

  private NATSClient client = new NATSClient();
  private String subject;

  @Override
  public void configure(Config config) throws Exception{
    super.configure(config);
    client.configure(config);
    subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
    if (subject == null) {
      throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
    }
  }

  /**
   * Publishes notification to NATS streaming server
   *
   * @param notification
   *            The notification to send
   * @throws Exception if something goes wrong with publish
   */
  @Override
  public void sendNotification(final Notification notification) throws Exception {
    String message = URLNotificationJSONConverter.toJSON((URLNotification) notification);
    try {
      client.getConnection().publish(subject, message.getBytes());
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception publishing NATSStreaming notification:");
      throw e;
    }
  }

  /**
   * Starts NATSStreaming connection and superclasses
   *
   * @throws Exception if there's an issue with superclasses, generating a client ID, or connecting to server
   */
  @Override
  public void startup() throws Exception {
    super.startup();
    client.startup();
  }

  /**
   * Safely closes the NATSStreaming connection and superclasses
   *
   * @throws Exception if superclasses throw exceptions
   */
  @Override
  public void shutdown() throws Exception {
    client.shutdown();
    super.shutdown();
  }

  /** @return NATSClient */
  public NATSClient getClient() {
    return client;
  }

  /** @param client NATSClient to set */
  public void setClient(NATSClient client) {
    this.client = client;
  }

  /** @return subject */
  public String getSubject() {
    return subject;
  }

  /** @param subject to set */
  public void setSubject(String subject) {
    this.subject = subject;
  }
}