DefaultNotificationReceiver.java
- /*
- * DefaultNotificationReceiver
- */
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.aws.JsonNotification;
- import gov.usgs.earthquake.distribution.roundrobinnotifier.RoundRobinListenerNotifier;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.earthquake.product.io.IOUtil;
- import gov.usgs.earthquake.product.io.JsonProduct;
- import gov.usgs.earthquake.product.io.ObjectProductSource;
- import gov.usgs.earthquake.product.io.ProductSource;
- import gov.usgs.earthquake.util.SizeLimitInputStream;
- import gov.usgs.util.Config;
- import gov.usgs.util.DefaultConfigurable;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.ObjectLock;
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.InputStream;
- import java.net.URL;
- import java.util.Date;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.logging.Logger;
- import javax.json.Json;
- import java.util.logging.Level;
- /**
- * The core of product distribution.
- *
- * A DefaultNotificationReceiver receives notifications and notifies listeners
- * of received notifications. NotificationListeners use the NotificationReceiver
- * to retrieve products referenced by notifications.
- *
- * The NotificationReceiver uses a NotificationIndex to track received
- * notifications, and a ProductStorage to store retrieved products.
- *
- * The DefaultNotificationReceiver implements the Configurable interface and
- * uses the following configuration parameters:
- *
- * Each listener has a separate queue of notifications. Each listener is
- * allocated one thread to process notifications from this queue.
- */
- public class DefaultNotificationReceiver extends DefaultConfigurable implements
- NotificationReceiver, NotificationIndexCleanup.Listener {
- /** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(DefaultNotificationReceiver.class.getName());
- /** Property referencing a notification index config section. */
- public static final String NOTIFICATION_INDEX_PROPERTY = "index";
- /** Shortcut to create a SQLite JDBCNotificationIndex. */
- public static final String INDEX_FILE_PROPERTY = "indexFile";
- /** Property referencing a product storage config section. */
- public static final String PRODUCT_STORAGE_PROPERTY = "storage";
- /** Shortcut to create a FileProductStorage. */
- public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";
- /** Property referencing how long to store products in milliseconds. */
- public static final String PRODUCT_STORAGE_MAX_AGE_PROPERTY = "storageAge";
- /** Default max age to store products, 3600000 milliseconds = 1 hour. */
- public static final String DEFAULT_PRODUCT_STORAGE_MAX_AGE = "3600000";
- /**
- * Property referencing how long to wait until checking for expired
- * notifications/products.
- */
- public static final String RECEIVER_CLEANUP_PROPERTY = "cleanupInterval";
- /**
- * Default time between checking for expired notifications/products, 900000
- * milliseconds = 15 minutes.
- */
- public static final String DEFAULT_RECEIVER_CLEANUP = "900000";
- /** Property for connection Timeout */
- public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
- /** Default connection timeout. 15 seconds */
- public static final String DEFAULT_CONNECT_TIMEOUT = "15000";
- /** Property for read timeout */
- public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
- /** default read timeout. 15 seconds */
- public static final String DEFAULT_READ_TIMEOUT = "15000";
- /** Property for listener notifier */
- public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier";
- /** Property for listener notifier to set to executor*/
- public static final String EXECUTOR_LISTENER_NOTIFIER = "executor";
- /** Property to listener notifier to set to future */
- public static final String FUTURE_LISTENER_NOTIFIER = "future";
- /** Property to listener notifier to set to roundrobin */
- public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin";
- /** The notification index where received notifications are stored. */
- private NotificationIndex notificationIndex;
- /** The product storage where retrieved products are stored. */
- private ProductStorage productStorage;
- /** How long to store retrieved product, in milliseconds. */
- private Long productStorageMaxAge = 0L;
- /** How long to wait until checking for expired notifications/products. */
- private Long receiverCleanupInterval = 0L;
- /** Timer that schedules receiver cleanup task. */
- private Timer receiverCleanupTimer = new Timer();
- /** Notification cleanup */
- private NotificationIndexCleanup notificationCleanup = null;
- private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT);
- private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT);
- private ListenerNotifier notifier;
- /** A lock that is acquired when a product is being retrieved. */
- private ObjectLock<ProductId> retrieveLocks = new ObjectLock<ProductId>();
- /** Creates new ExecutorListenerNotifier to var notifier */
- public DefaultNotificationReceiver() {
- notifier = new ExecutorListenerNotifier(this);
- }
- /**
- * Add a new notification listener.
- *
- * @param listener
- * the listener to add. When notifications are received, this
- * listener will be notified.
- * @throws Exception exception
- */
- public void addNotificationListener(NotificationListener listener)
- throws Exception {
- notifier.addNotificationListener(listener);
- }
- /**
- * Remove an existing notification listener.
- *
- * Any currently queued notifications are processed before shutting down.
- *
- * @param listener
- * the listener to remove. When notifications are receive, this
- * listener will no longer be notified.
- * @throws Exception exception
- */
- public void removeNotificationListener(NotificationListener listener)
- throws Exception {
- notifier.removeNotificationListener(listener);
- }
- /**
- * Store a notification and notify listeners.
- *
- * Updates the notification index before notifying listeners of the newly
- * available product.
- *
- * @param notification
- * the notification being received.
- * @throws Exception
- * if the notificationIndex throws an Exception.
- */
- public void receiveNotification(Notification notification) throws Exception {
- // notification processed
- new ProductTracker(notification.getTrackerURL()).notificationReceived(
- this.getName(), notification);
- if (notification.getExpirationDate().before(new Date())) {
- LOGGER.finer("[" + getName()
- + "] skipping already expired notification for product id="
- + notification.getProductId().toString() + ", expiration="
- + notification.getExpirationDate().toString());
- } else {
- // add notification to index
- notificationIndex.addNotification(notification);
- if (notification instanceof JsonNotification) {
- LOGGER.finer("[" + getName() + "] json notification " +
- notification.getProductId());
- } else if (notification instanceof URLNotification) {
- LOGGER.finer("["
- + getName()
- + "] notification URL="
- + ((URLNotification) notification).getProductURL()
- .toString());
- }
- notifyListeners(notification);
- }
- }
- /**
- * Send a notification to all registered NotificationListeners.
- *
- * Creates a NotificationEvent, with a reference to this object and calls
- * each notificationListeners onNotification method in separate threads.
- *
- * This method usually returns before registered NotificationListeners have
- * completed processing a notification.
- *
- * @param notification
- * the notification being sent to listeners.
- * @throws Exception exception
- */
- protected void notifyListeners(final Notification notification)
- throws Exception {
- LOGGER.finest("[" + getName() + "] notifying listeners for product id="
- + notification.getProductId().toString());
- // queue notification for listeners
- NotificationEvent event = new NotificationEvent(this, notification);
- notifier.notifyListeners(event);
- }
- /** @return "Using notifier" */
- public String getListenerQueueStatus() {
- return "Using notifier";
- }
- /**
- * Search the notification index for expired notifications, removing any
- * that are found. When a notification in the index is not a
- * URLNotification, it represents a product in storage that will also be
- * removed.
- *
- * @throws Exception
- * if NotificationIndexCleanup throws an Exception.
- */
- public void removeExpiredNotifications() throws Exception {
- LOGGER.fine("[" + getName() + "] running receiver cleanup");
- // use NotificationIndexCleanup to manage cleanup in separate thread
- if (this.notificationCleanup == null) {
- this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
- this.notificationCleanup.startup();
- } else {
- this.notificationCleanup.wakeUp();
- }
- }
- /**
- * Callback from the NotificationIndexCleanup thread.
- *
- * Checks if Notification refers to a product in storage,
- * which should also be removed.
- *
- * @param notification
- * expired notification about to be removed.
- * @throws Exception
- */
- public void onExpiredNotification(final Notification notification) throws Exception {
- if (!(notification instanceof URLNotification)) {
- // if it isn't a url notification, it's also in storage
- productStorage.removeProduct(notification.getProductId());
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("[" + getName()
- + "] removed expired product from receiver cache "
- + notification.getProductId().toString());
- }
- }
- }
- /**
- * Retrieve a product by id.
- *
- * If this product is already in storage, load and return the product.
- * Otherwise, search notifications for this product, and download the
- * product into storage.
- *
- * @param id
- * the product to retrieve
- * @return the retrieved product, or null if not available.
- * @throws Exception exception
- */
- public Product retrieveProduct(ProductId id) throws Exception {
- Product product = null;
- String productIdString = id.toString();
- LOGGER.finest("[" + getName() + "] acquiring retrieve lock id="
- + productIdString);
- retrieveLocks.acquireLock(id);
- LOGGER.finest("[" + getName() + "] retrieve lock acquired id="
- + productIdString);
- try {
- if (productStorage.hasProduct(id)) {
- try {
- LOGGER.finest("[" + getName() + "] storing product id="
- + productIdString);
- product = productStorage.getProduct(id);
- LOGGER.finest("[" + getName() + "] product stored id="
- + productIdString);
- } catch (Exception e) {
- LOGGER.log(
- Level.FINE,
- "["
- + getName()
- + "] storage claims hasProduct, but threw exception",
- e);
- product = null;
- }
- }
- if (product == null) {
- LOGGER.finer("[" + getName()
- + "] don't have product yet, searching notifications");
- // don't have product yet, search notifications
- Iterator<Notification> iter = notificationIndex
- .findNotifications(id).iterator();
- while (product == null && iter.hasNext()) {
- Notification notification = iter.next();
- if (!(notification instanceof URLNotification)) {
- // only URL notifications include location info
- continue;
- }
- InputStream in = null;
- try {
- URL productURL = ((URLNotification) notification)
- .getProductURL();
- ProductSource productSource = null;
- SizeLimitInputStream sizeIn = null;
- final Date beginConnect = new Date();
- Date beginDownload = new Date();
- if (productURL.getProtocol().equals("data")) {
- product = new JsonProduct().getProduct(Json.createReader(
- StreamUtils.getInputStream(productURL)).readObject());
- // JSON notification with embedded product
- LOGGER.finer("[" + getName() + "] parsed json notification for "
- + product.getId().toString());
- productSource = new ObjectProductSource(product);
- } else {
- // URL notification
- LOGGER.finer("[" + getName() + "] notification url "
- + productURL.toString());
- in = StreamUtils.getURLInputStream(productURL,
- connectTimeout, readTimeout);
- beginDownload = new Date();
- // use size limit with negative limit to count transfer size
- sizeIn = new SizeLimitInputStream(in, -1);
- productSource = IOUtil.autoDetectProductSource(sizeIn);
- }
- Notification storedNotification = storeProductSource(productSource);
- final Date endDownload = new Date();
- final long connectTime = beginDownload.getTime() - beginConnect.getTime();
- final long downloadTime = endDownload.getTime() - beginDownload.getTime();
- final long downloadSize = sizeIn != null ? sizeIn.getRead() : 0;
- final long downloadRate = Math.round(downloadSize /
- (Math.max(downloadTime, 1L) / 1000.0));
- LOGGER.fine("[" + getName() + "] receiver retrieved product"
- + " id=" + id.toString()
- + " (connect = " + connectTime + " ms)"
- + " (rate = " + downloadRate + " bytes/s)"
- + " (size = " + downloadSize + " bytes)"
- + " (time = " + downloadTime + " ms)"
- + " from "
- + (productURL.getProtocol().equals("data")
- ? "data url"
- : productURL.toString()));
- LOGGER.finest("[" + getName()
- + "] after store product, notification="
- + storedNotification);
- if (productStorage.hasProduct(id)) {
- LOGGER.finer("[" + getName()
- + "] getting product from storage");
- product = productStorage.getProduct(id);
- LOGGER.finest("[" + getName()
- + "] after getProduct, product=" + product);
- try {
- new ProductTracker(notification.getTrackerURL())
- .productDownloaded(this.getName(), id);
- LOGGER.fine("[" + getName()
- + "] product downloaded from "
- + (productURL.getProtocol().equals("data")
- ? "data url"
- : productURL.toString()));
- } catch (Exception e) {
- LOGGER.log(
- Level.WARNING,
- "["
- + getName()
- + "] exception notifying tracker about downloaded product",
- e);
- }
- } else {
- LOGGER.finer("[" + getName()
- + "] product not in storage id="
- + productIdString);
- }
- } catch (Exception e) {
- if (e instanceof ProductAlreadyInStorageException
- || e.getCause() instanceof ProductAlreadyInStorageException) {
- LOGGER.finer("[" + getName()
- + "] product already in storage id="
- + productIdString);
- product = productStorage.getProduct(id);
- continue;
- }
- // log any exception that happened while retrieving
- // product
- if (e instanceof FileNotFoundException) {
- LOGGER.warning("["
- + getName()
- + "] exception while retrieving product, file not found");
- } else {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception while retrieving product", e);
- new ProductTracker(notification.getTrackerURL())
- .exception(this.getName(), id, e);
- }
- } finally {
- StreamUtils.closeStream(in);
- }
- }
- }
- } finally {
- LOGGER.finest("[" + getName() + "] releasing retrieve lock id="
- + productIdString);
- retrieveLocks.releaseLock(id);
- LOGGER.finest("[" + getName() + "] retrieve lock released id="
- + productIdString);
- }
- // return product
- return product;
- }
- /**
- * Calls the current <code>ProductStorage.storeProductSource</code> method.
- *
- * @param source
- * The <code>ProductSource</code> to store.
- * @return The <code>ProductId</code> of the product referenced by the given
- * <code>ProductSource</code>.
- * @throws Exception exception
- * @see gov.usgs.earthquake.distribution.ProductStorage
- */
- protected Notification storeProductSource(ProductSource source)
- throws Exception {
- Notification notification = null;
- // store product input
- ProductId id = productStorage.storeProductSource(source);
- // check if stored
- if (productStorage.hasProduct(id)) {
- Product product = productStorage.getProduct(id);
- // calculate storage expiration date
- Date expirationDate = new Date(new Date().getTime()
- + productStorageMaxAge);
- // update notification index
- notification = new DefaultNotification(id, expirationDate,
- product.getTrackerURL());
- notificationIndex.addNotification(notification);
- }
- return notification;
- }
- /**
- * Send matching notifications to listener.
- *
- * Searches the NotificationIndex for matching notifications, and sends a
- * NotificationEvent for each notification found.
- *
- * @param listener
- * the listener to receive a NotificationEvent for each found
- * notification.
- * @param sources
- * sources to include, or null for all.
- * @param types
- * types to include, or null for all.
- * @param codes
- * codes to include, or null for all.
- * @throws Exception
- * if the notification index or notification listener throw an
- * exception.
- */
- public void sendNotifications(NotificationListener listener,
- List<String> sources, List<String> types, List<String> codes)
- throws Exception {
- List<Notification> notifications = notificationIndex.findNotifications(
- sources, types, codes);
- Iterator<Notification> iter = notifications.iterator();
- while (iter.hasNext()) {
- listener.onNotification(new NotificationEvent(this, iter.next()));
- }
- }
- public void configure(Config config) throws Exception {
- String notificationIndexName = config
- .getProperty(NOTIFICATION_INDEX_PROPERTY);
- String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
- if (notificationIndexName == null && notificationIndexFile == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'index' is a required configuration property");
- }
- if (notificationIndexName != null) {
- LOGGER.config("[" + getName() + "] loading notification index '"
- + notificationIndexName + "'");
- notificationIndex = (NotificationIndex) Config.getConfig()
- .getObject(notificationIndexName);
- if (notificationIndex == null) {
- throw new ConfigurationException("[" + getName() + "] index '"
- + notificationIndexName
- + "' is not properly configured");
- }
- } else {
- LOGGER.config("[" + getName() + "] using notification index '"
- + notificationIndexFile + "'");
- notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
- }
- String productStorageName = config
- .getProperty(PRODUCT_STORAGE_PROPERTY);
- String storageDirectory = config
- .getProperty(STORAGE_DIRECTORY_PROPERTY);
- if (productStorageName == null && storageDirectory == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'storage' is a required configuration property");
- }
- if (productStorageName != null) {
- LOGGER.config("[" + getName() + "] loading product storage '"
- + productStorageName + "'");
- productStorage = (ProductStorage) Config.getConfig().getObject(
- productStorageName);
- if (productStorage == null) {
- throw new ConfigurationException("[" + getName()
- + "] storage '" + productStorageName
- + "' is not properly configured");
- }
- } else {
- LOGGER.config("[" + getName() + "] using storage directory '"
- + storageDirectory + "'");
- productStorage = new FileProductStorage(new File(storageDirectory));
- }
- productStorageMaxAge = Long.parseLong(config.getProperty(
- PRODUCT_STORAGE_MAX_AGE_PROPERTY,
- // previously all lower-case
- config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY.toLowerCase(),
- DEFAULT_PRODUCT_STORAGE_MAX_AGE)));
- LOGGER.config("[" + getName() + "] storage max age "
- + productStorageMaxAge + " ms");
- receiverCleanupInterval = Long.parseLong(config.getProperty(
- RECEIVER_CLEANUP_PROPERTY, DEFAULT_RECEIVER_CLEANUP));
- LOGGER.config("[" + getName() + "] receiver cleanup interval "
- + receiverCleanupInterval + " ms");
- connectTimeout = Integer.parseInt(config.getProperty(
- CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
- LOGGER.config("[" + getName() + "] receiver connect timeout "
- + connectTimeout + " ms");
- readTimeout = Integer.parseInt(config.getProperty(
- READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
- LOGGER.config("[" + getName() + "] receiver read timeout "
- + readTimeout + " ms");
- String notifierType = config.getProperty(LISTENER_NOTIFIER_PROPERTY);
- if (notifierType != null) {
- if (notifierType.equals(EXECUTOR_LISTENER_NOTIFIER)) {
- notifier = new ExecutorListenerNotifier(this);
- LOGGER.config("[" + getName()
- + "] using executor listener notifier");
- } else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) {
- notifier = new FutureListenerNotifier(this);
- } else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) {
- notifier = new RoundRobinListenerNotifier(this);
- LOGGER.config("[" + getName()
- + "] using round-robin listener notifier");
- } else {
- throw new ConfigurationException("Unknown notifier type "
- + notifierType);
- }
- }
- }
- public void shutdown() throws Exception {
- receiverCleanupTimer.cancel();
- if (notificationCleanup != null) {
- try {
- notificationCleanup.shutdown();
- notificationCleanup = null;
- } catch (Exception ignore) {
- }
- }
- try {
- notifier.shutdown();
- } catch (Exception ignore) {
- }
- try {
- notificationIndex.shutdown();
- } catch (Exception ignore) {
- }
- try {
- productStorage.shutdown();
- } catch (Exception ignore) {
- }
- }
- public void startup() throws Exception {
- if (productStorage == null) {
- throw new ConfigurationException("[" + getName()
- + "] storage has not been configured properly");
- }
- if (notificationIndex == null) {
- throw new ConfigurationException("[" + getName()
- + "] index has not been configured properly");
- }
- productStorage.startup();
- notificationIndex.startup();
- // only schedule cleanup if interval is non-zero
- if (receiverCleanupInterval > 0) {
- receiverCleanupTimer.scheduleAtFixedRate(new TimerTask() {
- public void run() {
- try {
- removeExpiredNotifications();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception during receiver cleanup", e);
- }
- }
- }, 0, receiverCleanupInterval);
- }
- // do this last since it may start processing
- notifier.startup();
- // ProductClient already started these listeners...
- // Iterator<NotificationListener> iter = notificationListeners.keySet()
- // .iterator();
- // while (iter.hasNext()) {
- // iter.next().startup();
- // }
- }
- /**
- * @return the notificationIndex
- */
- public NotificationIndex getNotificationIndex() {
- return notificationIndex;
- }
- /**
- * @param notificationIndex
- * the notificationIndex to set
- */
- public void setNotificationIndex(NotificationIndex notificationIndex) {
- this.notificationIndex = notificationIndex;
- }
- /**
- * @return the productStorage
- */
- public ProductStorage getProductStorage() {
- return productStorage;
- }
- /**
- * @param productStorage
- * the productStorage to set
- */
- public void setProductStorage(ProductStorage productStorage) {
- this.productStorage = productStorage;
- }
- /**
- * @return the productStorageMaxAge
- */
- public Long getProductStorageMaxAge() {
- return productStorageMaxAge;
- }
- /**
- * @param productStorageMaxAge
- * the productStorageMaxAge to set
- */
- public void setProductStorageMaxAge(Long productStorageMaxAge) {
- this.productStorageMaxAge = productStorageMaxAge;
- }
- /**
- * @return the QueueStatus or null if ExecutorListenerNotifier doesn't exist
- */
- public Map<String, Integer> getQueueStatus() {
- if (notifier instanceof ExecutorListenerNotifier) {
- return ((ExecutorListenerNotifier) notifier).getStatus();
- }
- return null;
- }
- /**
- * Throttle notifier queues
- * @throws InterruptedException InterruptedException
- */
- public void throttleQueues() throws InterruptedException {
- if (notifier instanceof ExecutorListenerNotifier) {
- ((ExecutorListenerNotifier) notifier).throttleQueues();
- }
- }
- /**
- * @return receiverCleanupInterval
- */
- public Long getReceiverCleanupInterval() {
- return receiverCleanupInterval;
- }
- /**
- * @param receiverCleanupInterval the receiverCleanupInterval to set
- */
- public void setReceiverCleanupInterval(Long receiverCleanupInterval) {
- this.receiverCleanupInterval = receiverCleanupInterval;
- }
- /**
- * @return connectionTimeout
- */
- public int getConnectTimeout() {
- return connectTimeout;
- }
- /**
- * @param connectTimeout int connectionTimeout to set
- */
- public void setConnectTimeout(int connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
- /**
- * @return ListenerNotifier
- */
- public ListenerNotifier getNotifier() {
- return this.notifier;
- }
- /**
- * @param notifier ListenerNotifier to set
- */
- public void setNotifier(final ListenerNotifier notifier) {
- this.notifier = notifier;
- }
- /**
- * @return readTimeout
- */
- public int getReadTimeout() {
- return readTimeout;
- }
- /**
- * @param readTimeout int readTimeout to set
- */
- public void setReadTimeout(int readTimeout) {
- this.readTimeout = readTimeout;
- }
- }