NATSStreamingNotificationSender.java

  1. package gov.usgs.earthquake.nats;

  2. import gov.usgs.earthquake.distribution.*;
  3. import gov.usgs.util.Config;

  4. import java.util.logging.Level;
  5. import java.util.logging.Logger;

  6. /**
  7.  * Sends notifications directly to NATS streaming server using a NATS client
  8.  */
  9. public class NATSStreamingNotificationSender extends DefaultNotificationSender {

  10.   private static final Logger LOGGER = Logger
  11.           .getLogger(DefaultNotificationSender.class.getName());

  12.   private NATSClient client = new NATSClient();
  13.   private String subject;

  14.   @Override
  15.   public void configure(Config config) throws Exception{
  16.     super.configure(config);
  17.     client.configure(config);
  18.     subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
  19.     if (subject == null) {
  20.       throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
  21.     }
  22.   }

  23.   /**
  24.    * Publishes notification to NATS streaming server
  25.    *
  26.    * @param notification
  27.    *            The notification to send
  28.    * @throws Exception if something goes wrong with publish
  29.    */
  30.   @Override
  31.   public void sendNotification(final Notification notification) throws Exception {
  32.     String message = URLNotificationJSONConverter.toJSON((URLNotification) notification);
  33.     try {
  34.       client.getConnection().publish(subject, message.getBytes());
  35.     } catch (Exception e) {
  36.       LOGGER.log(Level.WARNING, "[" + getName() + "] exception publishing NATSStreaming notification:");
  37.       throw e;
  38.     }
  39.   }

  40.   /**
  41.    * Starts NATSStreaming connection and superclasses
  42.    *
  43.    * @throws Exception if there's an issue with superclasses, generating a client ID, or connecting to server
  44.    */
  45.   @Override
  46.   public void startup() throws Exception {
  47.     super.startup();
  48.     client.startup();
  49.   }

  50.   /**
  51.    * Safely closes the NATSStreaming connection and superclasses
  52.    *
  53.    * @throws Exception if superclasses throw exceptions
  54.    */
  55.   @Override
  56.   public void shutdown() throws Exception {
  57.     client.shutdown();
  58.     super.shutdown();
  59.   }

  60.   /** @return NATSClient */
  61.   public NATSClient getClient() {
  62.     return client;
  63.   }

  64.   /** @param client NATSClient to set */
  65.   public void setClient(NATSClient client) {
  66.     this.client = client;
  67.   }

  68.   /** @return subject */
  69.   public String getSubject() {
  70.     return subject;
  71.   }

  72.   /** @param subject to set */
  73.   public void setSubject(String subject) {
  74.     this.subject = subject;
  75.   }
  76. }