DefaultNotificationSender.java
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
/**
* The base class for all Notification senders.
*
* The DefaultNotificationSender references a general need to send notifications. It extends DefaultNotificationListener
* to allow forwarding of products from any subclass of DefaultNotificationReceiver. *
*/
public class DefaultNotificationSender extends DefaultNotificationListener {
private static final Logger LOGGER = Logger
.getLogger(DefaultNotificationSender.class.getName());
/** Property referencing the server host */
public static final String SERVER_HOST_PROPERTY = "serverHost";
/** Property referencing the server port */
public static final String SERVER_PORT_PROPERTY = "serverPort";
/** Property referencing product storage object to use */
public static final String PRODUCT_STORAGE_PROPERTY = "storage";
/** Property referencing the length of time products should be held in storage*/
public static final String PRODUCT_STORAGE_MAX_AGE_PROPERTY = "storageage";
/** property for max age of product in storage. 7000 days? */
public static final String DEFAULT_PRODUCT_STORAGE_MAX_AGE = "604800000";
/** Variable for String serverHost */
protected String serverHost;
/** Variable for String serverPort */
protected String serverPort;
/** Variable for URL productStorage */
protected URLProductStorage productStorage;
/** Variable for long productStorageMaxAge */
protected long productStorageMaxAge;
/**
* Configures based on configuration section.
*
* @param config
* The config
* @throws Exception if something goes wrong
*/
public void configure(Config config) throws Exception {
// let default notification listener configure itself
super.configure(config);
if (getNotificationIndex() == null) {
throw new ConfigurationException("[" + getName()
+ "] 'index' is a required configuration property");
}
String productStorageName = config.getProperty(PRODUCT_STORAGE_PROPERTY);
if (productStorageName == null) {
throw new ConfigurationException("[" + getName() + "] '" + PRODUCT_STORAGE_PROPERTY + "' is a required property.");
}
LOGGER.config("[" + getName() + "] loading product storage '" + productStorageName + "'.");
this.productStorage = (URLProductStorage) Config.getConfig().getObject(productStorageName);
if (productStorage == null) {
throw new ConfigurationException("[" + getName() + "] product storage '" + productStorageName + "' improperly configured.");
}
productStorageMaxAge = Long.parseLong(config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY,DEFAULT_PRODUCT_STORAGE_MAX_AGE));
LOGGER.config("[" + getName() + "] product storage max age: " + productStorageMaxAge + "ms");
serverHost = config.getProperty(SERVER_HOST_PROPERTY);
LOGGER.config("[" + getName() + "] messenger server host: " + serverHost);
serverPort = config.getProperty(SERVER_PORT_PROPERTY);
LOGGER.config("[" + getName() + "] messenger server port: " + serverPort);
}
/**
* Called on receipt of a new product. Stores this product and calls sendMessage()
* Most of this logic was lifted from the pre-08/2019 EIDSNotificationSender class.
*
* @param product
* a product whose notification was accepted.
* @throws Exception if something goes wrong
*/
public void onProduct(final Product product) throws Exception {
ProductId id = product.getId();
// store product
try {
productStorage.storeProduct(product);
} catch (ProductAlreadyInStorageException e) {
// ignore
}
// create notification
// make expiration relative to now
Date expirationDate = new Date(new Date().getTime()
+ productStorageMaxAge);
URLNotification notification = new URLNotification(id, expirationDate,
product.getTrackerURL(), productStorage.getProductURL(id));
// remove any existing notifications, generally there won't be any
Iterator<Notification> existing = getNotificationIndex()
.findNotifications(id).iterator();
while (existing.hasNext()) {
getNotificationIndex().removeNotification(existing.next());
}
// send notification
try {
sendNotification(notification);
} catch (Exception e) {
// if fails, try to remove from storage
productStorage.removeProduct(id);
throw e;
}
// add created notification to index. Used to track which products
// have been processed, and to delete after expirationDate
// done after send in case send fails
getNotificationIndex().addNotification(notification);
// track that notification was sent
new ProductTracker(notification.getTrackerURL()).notificationSent(
this.getName(), notification);
}
/**
* Called just before this listener processes a notification.
*
* @param notification
* notification about to be processed.
* @return true to process the notification, false to skip
* @throws Exception if something goes wrong
*/
@Override
protected boolean onBeforeProcessNotification(
final Notification notification) throws Exception {
if (!isProcessDuplicates()) {
// only check if we care
List<Notification> notifications = getNotificationIndex()
.findNotifications(notification.getProductId());
if (notifications.size() > 0) {
if (productStorage.hasProduct(notification.getProductId())) {
LOGGER.finer("[" + getName()
+ "] skipping existing product "
+ notification.getProductId().toString());
return false;
} else {
LOGGER.finer("["
+ getName()
+ "] found notifications, but product missing from storage "
+ notification.getProductId().toString());
}
}
}
return true;
}
@Override
protected void onAfterProcessNotification(final Notification notification) {
// function replaced so notifications not added to index
// this class responds to the index
}
/**
* Called when a notification expires
*
* @param notification
* The expired notification
* @throws Exception if something goes wrong
*/
@Override
public void onExpiredNotification(final Notification notification) throws Exception{
List<Notification> notifications = getNotificationIndex()
.findNotifications(notification.getProductId());
if (notifications.size() <= 1) {
// this is called before removing notification from index.
productStorage.removeProduct(notification.getProductId());
LOGGER.finer("[" + getName()
+ "] removed expired product from sender storage "
+ notification.getProductId().toString());
} else {
// still have notifications left for product, don't remove
}
}
/**
* Utility method to do the actual notification sending. Should be overridden by subclasses.
*
* @param notification
* The notification to send
* @throws Exception if something goes wrong
*/
protected void sendNotification(final Notification notification) throws Exception {
LOGGER.info("[" + getName() + "] sent message " + notification.toString());
}
/**
* Start up storage
*
* @throws Exception if something goes wrong
*/
public void startup() throws Exception{
productStorage.startup();
super.startup();
}
/**
* Shut down storage
*
* @throws Exception if something goes wrong
*/
public void shutdown() throws Exception{
super.shutdown();
try {
productStorage.shutdown();
} catch (Exception e) {
//do nothing
}
}
/** @return serverHost */
public String getServerHost() {
return serverHost;
}
/** @param serverHost string to set */
public void setServerHost(String serverHost) {
this.serverHost = serverHost;
}
/** @return serverPort */
public String getServerPort() {
return serverPort;
}
/** @param serverPort string to set */
public void setServerPort(String serverPort) {
this.serverPort = serverPort;
}
/** @return productStorage */
public URLProductStorage getProductStorage() {
return productStorage;
}
/** @param productStorage URLProductStorage to set */
public void setProductStorage(URLProductStorage productStorage) {
this.productStorage = productStorage;
}
/** @return productStorageMaxAge */
public long getProductStorageMaxAge() {
return productStorageMaxAge;
}
/** @param productStorageMaxAge long to set */
public void setProductStorageMaxAge(long productStorageMaxAge) {
this.productStorageMaxAge = productStorageMaxAge;
}
}