NATSStreamingNotificationSender.java
- package gov.usgs.earthquake.nats;
- import gov.usgs.earthquake.distribution.*;
- import gov.usgs.util.Config;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Sends notifications directly to NATS streaming server using a NATS client
- */
- public class NATSStreamingNotificationSender extends DefaultNotificationSender {
- private static final Logger LOGGER = Logger
- .getLogger(DefaultNotificationSender.class.getName());
- private NATSClient client = new NATSClient();
- private String subject;
- @Override
- public void configure(Config config) throws Exception{
- super.configure(config);
- client.configure(config);
- subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
- if (subject == null) {
- throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
- }
- }
- /**
- * Publishes notification to NATS streaming server
- *
- * @param notification
- * The notification to send
- * @throws Exception if something goes wrong with publish
- */
- @Override
- public void sendNotification(final Notification notification) throws Exception {
- String message = URLNotificationJSONConverter.toJSON((URLNotification) notification);
- try {
- client.getConnection().publish(subject, message.getBytes());
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName() + "] exception publishing NATSStreaming notification:");
- throw e;
- }
- }
- /**
- * Starts NATSStreaming connection and superclasses
- *
- * @throws Exception if there's an issue with superclasses, generating a client ID, or connecting to server
- */
- @Override
- public void startup() throws Exception {
- super.startup();
- client.startup();
- }
- /**
- * Safely closes the NATSStreaming connection and superclasses
- *
- * @throws Exception if superclasses throw exceptions
- */
- @Override
- public void shutdown() throws Exception {
- client.shutdown();
- super.shutdown();
- }
- /** @return NATSClient */
- public NATSClient getClient() {
- return client;
- }
- /** @param client NATSClient to set */
- public void setClient(NATSClient client) {
- this.client = client;
- }
- /** @return subject */
- public String getSubject() {
- return subject;
- }
- /** @param subject to set */
- public void setSubject(String subject) {
- this.subject = subject;
- }
- }