DefaultNotificationReceiver.java

  1. /*
  2.  * DefaultNotificationReceiver
  3.  */
  4. package gov.usgs.earthquake.distribution;

  5. import gov.usgs.earthquake.aws.JsonNotification;
  6. import gov.usgs.earthquake.distribution.roundrobinnotifier.RoundRobinListenerNotifier;
  7. import gov.usgs.earthquake.product.Product;
  8. import gov.usgs.earthquake.product.ProductId;
  9. import gov.usgs.earthquake.product.io.IOUtil;
  10. import gov.usgs.earthquake.product.io.JsonProduct;
  11. import gov.usgs.earthquake.product.io.ObjectProductSource;
  12. import gov.usgs.earthquake.product.io.ProductSource;
  13. import gov.usgs.earthquake.util.SizeLimitInputStream;
  14. import gov.usgs.util.Config;
  15. import gov.usgs.util.DefaultConfigurable;
  16. import gov.usgs.util.StreamUtils;
  17. import gov.usgs.util.ObjectLock;

  18. import java.io.File;
  19. import java.io.FileNotFoundException;
  20. import java.io.InputStream;
  21. import java.net.URL;
  22. import java.util.Date;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Timer;
  27. import java.util.TimerTask;
  28. import java.util.logging.Logger;

  29. import javax.json.Json;

  30. import java.util.logging.Level;

  31. /**
  32.  * The core of product distribution.
  33.  *
  34.  * A DefaultNotificationReceiver receives notifications and notifies listeners
  35.  * of received notifications. NotificationListeners use the NotificationReceiver
  36.  * to retrieve products referenced by notifications.
  37.  *
  38.  * The NotificationReceiver uses a NotificationIndex to track received
  39.  * notifications, and a ProductStorage to store retrieved products.
  40.  *
  41.  * The DefaultNotificationReceiver implements the Configurable interface and
  42.  * uses the following configuration parameters:
  43.  *
  44.  * Each listener has a separate queue of notifications. Each listener is
  45.  * allocated one thread to process notifications from this queue.
  46.  */
  47. public class DefaultNotificationReceiver extends DefaultConfigurable implements
  48.         NotificationReceiver, NotificationIndexCleanup.Listener {

  49.     /** Logging object. */
  50.     private static final Logger LOGGER = Logger
  51.             .getLogger(DefaultNotificationReceiver.class.getName());

  52.     /** Property referencing a notification index config section. */
  53.     public static final String NOTIFICATION_INDEX_PROPERTY = "index";

  54.     /** Shortcut to create a SQLite JDBCNotificationIndex. */
  55.     public static final String INDEX_FILE_PROPERTY = "indexFile";

  56.     /** Property referencing a product storage config section. */
  57.     public static final String PRODUCT_STORAGE_PROPERTY = "storage";

  58.     /** Shortcut to create a FileProductStorage. */
  59.     public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

  60.     /** Property referencing how long to store products in milliseconds. */
  61.     public static final String PRODUCT_STORAGE_MAX_AGE_PROPERTY = "storageAge";

  62.     /** Default max age to store products, 3600000 milliseconds = 1 hour. */
  63.     public static final String DEFAULT_PRODUCT_STORAGE_MAX_AGE = "3600000";

  64.     /**
  65.      * Property referencing how long to wait until checking for expired
  66.      * notifications/products.
  67.      */
  68.     public static final String RECEIVER_CLEANUP_PROPERTY = "cleanupInterval";

  69.     /**
  70.      * Default time between checking for expired notifications/products, 900000
  71.      * milliseconds = 15 minutes.
  72.      */
  73.     public static final String DEFAULT_RECEIVER_CLEANUP = "900000";

  74.     /** Property for connection Timeout */
  75.     public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  76.     /** Default connection timeout. 15 seconds */
  77.     public static final String DEFAULT_CONNECT_TIMEOUT = "15000";
  78.     /** Property for read timeout */
  79.     public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
  80.     /** default read timeout. 15 seconds */
  81.     public static final String DEFAULT_READ_TIMEOUT = "15000";

  82.     /** Property for listener notifier */
  83.     public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier";
  84.     /** Property for listener notifier to set to executor*/
  85.     public static final String EXECUTOR_LISTENER_NOTIFIER = "executor";
  86.     /** Property to listener notifier to set to future */
  87.     public static final String FUTURE_LISTENER_NOTIFIER = "future";
  88.     /** Property to listener notifier to set to roundrobin */
  89.     public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin";

  90.     /** The notification index where received notifications are stored. */
  91.     private NotificationIndex notificationIndex;

  92.     /** The product storage where retrieved products are stored. */
  93.     private ProductStorage productStorage;

  94.     /** How long to store retrieved product, in milliseconds. */
  95.     private Long productStorageMaxAge = 0L;

  96.     /** How long to wait until checking for expired notifications/products. */
  97.     private Long receiverCleanupInterval = 0L;

  98.     /** Timer that schedules receiver cleanup task. */
  99.     private Timer receiverCleanupTimer = new Timer();

  100.     /** Notification cleanup */
  101.     private NotificationIndexCleanup notificationCleanup = null;

  102.     private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT);
  103.     private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT);

  104.     private ListenerNotifier notifier;

  105.     /** A lock that is acquired when a product is being retrieved. */
  106.     private ObjectLock<ProductId> retrieveLocks = new ObjectLock<ProductId>();

  107.     /** Creates new ExecutorListenerNotifier to var notifier */
  108.     public DefaultNotificationReceiver() {
  109.         notifier = new ExecutorListenerNotifier(this);
  110.     }

  111.     /**
  112.      * Add a new notification listener.
  113.      *
  114.      * @param listener
  115.      *            the listener to add. When notifications are received, this
  116.      *            listener will be notified.
  117.      * @throws Exception exception
  118.      */
  119.     public void addNotificationListener(NotificationListener listener)
  120.             throws Exception {
  121.         notifier.addNotificationListener(listener);
  122.     }

  123.     /**
  124.      * Remove an existing notification listener.
  125.      *
  126.      * Any currently queued notifications are processed before shutting down.
  127.      *
  128.      * @param listener
  129.      *            the listener to remove. When notifications are receive, this
  130.      *            listener will no longer be notified.
  131.      * @throws Exception exception
  132.      */
  133.     public void removeNotificationListener(NotificationListener listener)
  134.             throws Exception {
  135.         notifier.removeNotificationListener(listener);
  136.     }

  137.     /**
  138.      * Store a notification and notify listeners.
  139.      *
  140.      * Updates the notification index before notifying listeners of the newly
  141.      * available product.
  142.      *
  143.      * @param notification
  144.      *            the notification being received.
  145.      * @throws Exception
  146.      *             if the notificationIndex throws an Exception.
  147.      */
  148.     public void receiveNotification(Notification notification) throws Exception {
  149.         // notification processed
  150.         new ProductTracker(notification.getTrackerURL()).notificationReceived(
  151.                 this.getName(), notification);

  152.         if (notification.getExpirationDate().before(new Date())) {
  153.             LOGGER.finer("[" + getName()
  154.                     + "] skipping already expired notification for product id="
  155.                     + notification.getProductId().toString() + ", expiration="
  156.                     + notification.getExpirationDate().toString());
  157.         } else {
  158.             // add notification to index
  159.             notificationIndex.addNotification(notification);

  160.             if (notification instanceof JsonNotification) {
  161.                 LOGGER.finer("[" + getName() + "] json notification " +
  162.                         notification.getProductId());
  163.             } else if (notification instanceof URLNotification) {
  164.                 LOGGER.finer("["
  165.                         + getName()
  166.                         + "] notification URL="
  167.                         + ((URLNotification) notification).getProductURL()
  168.                                 .toString());
  169.             }

  170.             notifyListeners(notification);
  171.         }
  172.     }

  173.     /**
  174.      * Send a notification to all registered NotificationListeners.
  175.      *
  176.      * Creates a NotificationEvent, with a reference to this object and calls
  177.      * each notificationListeners onNotification method in separate threads.
  178.      *
  179.      * This method usually returns before registered NotificationListeners have
  180.      * completed processing a notification.
  181.      *
  182.      * @param notification
  183.      *            the notification being sent to listeners.
  184.      * @throws Exception exception
  185.      */
  186.     protected void notifyListeners(final Notification notification)
  187.             throws Exception {

  188.         LOGGER.finest("[" + getName() + "] notifying listeners for product id="
  189.                 + notification.getProductId().toString());

  190.         // queue notification for listeners
  191.         NotificationEvent event = new NotificationEvent(this, notification);
  192.         notifier.notifyListeners(event);
  193.     }
  194.     /** @return "Using notifier" */
  195.     public String getListenerQueueStatus() {
  196.         return "Using notifier";
  197.     }

  198.     /**
  199.      * Search the notification index for expired notifications, removing any
  200.      * that are found. When a notification in the index is not a
  201.      * URLNotification, it represents a product in storage that will also be
  202.      * removed.
  203.      *
  204.      * @throws Exception
  205.      *             if NotificationIndexCleanup throws an Exception.
  206.      */
  207.     public void removeExpiredNotifications() throws Exception {
  208.         LOGGER.fine("[" + getName() + "] running receiver cleanup");
  209.         // use NotificationIndexCleanup to manage cleanup in separate thread
  210.         if (this.notificationCleanup == null) {
  211.             this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
  212.             this.notificationCleanup.startup();
  213.         } else {
  214.             this.notificationCleanup.wakeUp();
  215.         }
  216.     }

  217.     /**
  218.      * Callback from the NotificationIndexCleanup thread.
  219.      *
  220.      * Checks if Notification refers to a product in storage,
  221.      * which should also be removed.
  222.      *
  223.      * @param notification
  224.      *     expired notification about to be removed.
  225.      * @throws Exception
  226.      */
  227.     public void onExpiredNotification(final Notification notification) throws Exception {
  228.         if (!(notification instanceof URLNotification)) {
  229.             // if it isn't a url notification, it's also in storage
  230.             productStorage.removeProduct(notification.getProductId());
  231.             if (LOGGER.isLoggable(Level.FINEST)) {
  232.                 LOGGER.finest("[" + getName()
  233.                         + "] removed expired product from receiver cache "
  234.                         + notification.getProductId().toString());
  235.             }
  236.         }
  237.     }

  238.     /**
  239.      * Retrieve a product by id.
  240.      *
  241.      * If this product is already in storage, load and return the product.
  242.      * Otherwise, search notifications for this product, and download the
  243.      * product into storage.
  244.      *
  245.      * @param id
  246.      *            the product to retrieve
  247.      * @return the retrieved product, or null if not available.
  248.      * @throws Exception exception
  249.      */
  250.     public Product retrieveProduct(ProductId id) throws Exception {
  251.         Product product = null;
  252.         String productIdString = id.toString();

  253.         LOGGER.finest("[" + getName() + "] acquiring retrieve lock id="
  254.                 + productIdString);
  255.         retrieveLocks.acquireLock(id);
  256.         LOGGER.finest("[" + getName() + "] retrieve lock acquired id="
  257.                 + productIdString);
  258.         try {
  259.             if (productStorage.hasProduct(id)) {
  260.                 try {
  261.                     LOGGER.finest("[" + getName() + "] storing product id="
  262.                             + productIdString);
  263.                     product = productStorage.getProduct(id);
  264.                     LOGGER.finest("[" + getName() + "] product stored id="
  265.                             + productIdString);
  266.                 } catch (Exception e) {
  267.                     LOGGER.log(
  268.                             Level.FINE,
  269.                             "["
  270.                                     + getName()
  271.                                     + "] storage claims hasProduct, but threw exception",
  272.                             e);
  273.                     product = null;
  274.                 }
  275.             }

  276.             if (product == null) {
  277.                 LOGGER.finer("[" + getName()
  278.                         + "] don't have product yet, searching notifications");
  279.                 // don't have product yet, search notifications
  280.                 Iterator<Notification> iter = notificationIndex
  281.                         .findNotifications(id).iterator();
  282.                 while (product == null && iter.hasNext()) {
  283.                     Notification notification = iter.next();
  284.                     if (!(notification instanceof URLNotification)) {
  285.                         // only URL notifications include location info
  286.                         continue;
  287.                     }

  288.                     InputStream in = null;
  289.                     try {
  290.                         URL productURL = ((URLNotification) notification)
  291.                                 .getProductURL();

  292.                         ProductSource productSource = null;
  293.                         SizeLimitInputStream sizeIn = null;

  294.                         final Date beginConnect = new Date();
  295.                         Date beginDownload = new Date();
  296.                         if (productURL.getProtocol().equals("data")) {
  297.                             product = new JsonProduct().getProduct(Json.createReader(
  298.                                     StreamUtils.getInputStream(productURL)).readObject());
  299.                             // JSON notification with embedded product
  300.                             LOGGER.finer("[" + getName() + "] parsed json notification for "
  301.                                     + product.getId().toString());
  302.                             productSource = new ObjectProductSource(product);
  303.                         } else {
  304.                             // URL notification
  305.                             LOGGER.finer("[" + getName() + "] notification url "
  306.                                     + productURL.toString());

  307.                             in = StreamUtils.getURLInputStream(productURL,
  308.                                     connectTimeout, readTimeout);
  309.                             beginDownload = new Date();
  310.                             // use size limit with negative limit to count transfer size
  311.                             sizeIn = new SizeLimitInputStream(in, -1);
  312.                             productSource = IOUtil.autoDetectProductSource(sizeIn);
  313.                         }

  314.                         Notification storedNotification = storeProductSource(productSource);

  315.                         final Date endDownload = new Date();
  316.                         final long connectTime = beginDownload.getTime() - beginConnect.getTime();
  317.                         final long downloadTime = endDownload.getTime() - beginDownload.getTime();
  318.                         final long downloadSize = sizeIn != null ? sizeIn.getRead() : 0;
  319.                         final long downloadRate = Math.round(downloadSize /
  320.                                 (Math.max(downloadTime, 1L) / 1000.0));

  321.                         LOGGER.fine("[" + getName() + "] receiver retrieved product"
  322.                                 + " id=" + id.toString()
  323.                                 + " (connect = " + connectTime + " ms)"
  324.                                 + " (rate = " + downloadRate +  " bytes/s)"
  325.                                 + " (size = " + downloadSize + " bytes)"
  326.                                 + " (time = " + downloadTime + " ms)"
  327.                                 + " from "
  328.                                 + (productURL.getProtocol().equals("data")
  329.                                         ? "data url"
  330.                                         : productURL.toString()));

  331.                         LOGGER.finest("[" + getName()
  332.                                 + "] after store product, notification="
  333.                                 + storedNotification);

  334.                         if (productStorage.hasProduct(id)) {
  335.                             LOGGER.finer("[" + getName()
  336.                                     + "] getting product from storage");
  337.                             product = productStorage.getProduct(id);
  338.                             LOGGER.finest("[" + getName()
  339.                                     + "] after getProduct, product=" + product);

  340.                             try {
  341.                                 new ProductTracker(notification.getTrackerURL())
  342.                                         .productDownloaded(this.getName(), id);
  343.                                 LOGGER.fine("[" + getName()
  344.                                         + "] product downloaded from "
  345.                                         + (productURL.getProtocol().equals("data")
  346.                                                 ? "data url"
  347.                                                 : productURL.toString()));
  348.                             } catch (Exception e) {
  349.                                 LOGGER.log(
  350.                                         Level.WARNING,
  351.                                         "["
  352.                                                 + getName()
  353.                                                 + "] exception notifying tracker about downloaded product",
  354.                                         e);
  355.                             }
  356.                         } else {
  357.                             LOGGER.finer("[" + getName()
  358.                                     + "] product not in storage id="
  359.                                     + productIdString);
  360.                         }
  361.                     } catch (Exception e) {
  362.                         if (e instanceof ProductAlreadyInStorageException
  363.                                 || e.getCause() instanceof ProductAlreadyInStorageException) {
  364.                             LOGGER.finer("[" + getName()
  365.                                     + "] product already in storage id="
  366.                                     + productIdString);
  367.                             product = productStorage.getProduct(id);
  368.                             continue;
  369.                         }

  370.                         // log any exception that happened while retrieving
  371.                         // product
  372.                         if (e instanceof FileNotFoundException) {
  373.                             LOGGER.warning("["
  374.                                     + getName()
  375.                                     + "] exception while retrieving product, file not found");
  376.                         } else {
  377.                             LOGGER.log(Level.WARNING, "[" + getName()
  378.                                     + "] exception while retrieving product", e);
  379.                             new ProductTracker(notification.getTrackerURL())
  380.                                     .exception(this.getName(), id, e);
  381.                         }
  382.                     } finally {
  383.                         StreamUtils.closeStream(in);
  384.                     }
  385.                 }
  386.             }
  387.         } finally {
  388.             LOGGER.finest("[" + getName() + "] releasing retrieve lock id="
  389.                     + productIdString);
  390.             retrieveLocks.releaseLock(id);
  391.             LOGGER.finest("[" + getName() + "] retrieve lock released id="
  392.                     + productIdString);
  393.         }

  394.         // return product
  395.         return product;
  396.     }

  397.     /**
  398.      * Calls the current <code>ProductStorage.storeProductSource</code> method.
  399.      *
  400.      * @param source
  401.      *            The <code>ProductSource</code> to store.
  402.      * @return The <code>ProductId</code> of the product referenced by the given
  403.      *         <code>ProductSource</code>.
  404.      * @throws Exception exception
  405.      * @see gov.usgs.earthquake.distribution.ProductStorage
  406.      */
  407.     protected Notification storeProductSource(ProductSource source)
  408.             throws Exception {
  409.         Notification notification = null;

  410.         // store product input
  411.         ProductId id = productStorage.storeProductSource(source);

  412.         // check if stored
  413.         if (productStorage.hasProduct(id)) {
  414.             Product product = productStorage.getProduct(id);

  415.             // calculate storage expiration date
  416.             Date expirationDate = new Date(new Date().getTime()
  417.                     + productStorageMaxAge);

  418.             // update notification index
  419.             notification = new DefaultNotification(id, expirationDate,
  420.                     product.getTrackerURL());
  421.             notificationIndex.addNotification(notification);
  422.         }

  423.         return notification;
  424.     }

  425.     /**
  426.      * Send matching notifications to listener.
  427.      *
  428.      * Searches the NotificationIndex for matching notifications, and sends a
  429.      * NotificationEvent for each notification found.
  430.      *
  431.      * @param listener
  432.      *            the listener to receive a NotificationEvent for each found
  433.      *            notification.
  434.      * @param sources
  435.      *            sources to include, or null for all.
  436.      * @param types
  437.      *            types to include, or null for all.
  438.      * @param codes
  439.      *            codes to include, or null for all.
  440.      * @throws Exception
  441.      *             if the notification index or notification listener throw an
  442.      *             exception.
  443.      */
  444.     public void sendNotifications(NotificationListener listener,
  445.             List<String> sources, List<String> types, List<String> codes)
  446.             throws Exception {
  447.         List<Notification> notifications = notificationIndex.findNotifications(
  448.                 sources, types, codes);
  449.         Iterator<Notification> iter = notifications.iterator();
  450.         while (iter.hasNext()) {
  451.             listener.onNotification(new NotificationEvent(this, iter.next()));
  452.         }
  453.     }

  454.     public void configure(Config config) throws Exception {
  455.         String notificationIndexName = config
  456.                 .getProperty(NOTIFICATION_INDEX_PROPERTY);
  457.         String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
  458.         if (notificationIndexName == null && notificationIndexFile == null) {
  459.             throw new ConfigurationException("[" + getName()
  460.                     + "] 'index' is a required configuration property");
  461.         }
  462.         if (notificationIndexName != null) {
  463.             LOGGER.config("[" + getName() + "] loading notification index '"
  464.                     + notificationIndexName + "'");
  465.             notificationIndex = (NotificationIndex) Config.getConfig()
  466.                     .getObject(notificationIndexName);
  467.             if (notificationIndex == null) {
  468.                 throw new ConfigurationException("[" + getName() + "] index '"
  469.                         + notificationIndexName
  470.                         + "' is not properly configured");
  471.             }
  472.         } else {
  473.             LOGGER.config("[" + getName() + "] using notification index '"
  474.                     + notificationIndexFile + "'");
  475.             notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
  476.         }

  477.         String productStorageName = config
  478.                 .getProperty(PRODUCT_STORAGE_PROPERTY);
  479.         String storageDirectory = config
  480.                 .getProperty(STORAGE_DIRECTORY_PROPERTY);
  481.         if (productStorageName == null && storageDirectory == null) {
  482.             throw new ConfigurationException("[" + getName()
  483.                     + "] 'storage' is a required configuration property");
  484.         }
  485.         if (productStorageName != null) {
  486.             LOGGER.config("[" + getName() + "] loading product storage '"
  487.                     + productStorageName + "'");
  488.             productStorage = (ProductStorage) Config.getConfig().getObject(
  489.                     productStorageName);
  490.             if (productStorage == null) {
  491.                 throw new ConfigurationException("[" + getName()
  492.                         + "] storage '" + productStorageName
  493.                         + "' is not properly configured");
  494.             }
  495.         } else {
  496.             LOGGER.config("[" + getName() + "] using storage directory '"
  497.                     + storageDirectory + "'");
  498.             productStorage = new FileProductStorage(new File(storageDirectory));
  499.         }

  500.         productStorageMaxAge = Long.parseLong(config.getProperty(
  501.                 PRODUCT_STORAGE_MAX_AGE_PROPERTY,
  502.                 // previously all lower-case
  503.                 config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY.toLowerCase(),
  504.                         DEFAULT_PRODUCT_STORAGE_MAX_AGE)));
  505.         LOGGER.config("[" + getName() + "] storage max age "
  506.                 + productStorageMaxAge + " ms");

  507.         receiverCleanupInterval = Long.parseLong(config.getProperty(
  508.                 RECEIVER_CLEANUP_PROPERTY, DEFAULT_RECEIVER_CLEANUP));
  509.         LOGGER.config("[" + getName() + "] receiver cleanup interval "
  510.                 + receiverCleanupInterval + " ms");

  511.         connectTimeout = Integer.parseInt(config.getProperty(
  512.                 CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
  513.         LOGGER.config("[" + getName() + "] receiver connect timeout "
  514.                 + connectTimeout + " ms");

  515.         readTimeout = Integer.parseInt(config.getProperty(
  516.                 READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
  517.         LOGGER.config("[" + getName() + "] receiver read timeout "
  518.                 + readTimeout + " ms");

  519.         String notifierType = config.getProperty(LISTENER_NOTIFIER_PROPERTY);
  520.         if (notifierType != null) {
  521.             if (notifierType.equals(EXECUTOR_LISTENER_NOTIFIER)) {
  522.                 notifier = new ExecutorListenerNotifier(this);
  523.                 LOGGER.config("[" + getName()
  524.                         + "] using executor listener notifier");
  525.             } else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) {
  526.                 notifier = new FutureListenerNotifier(this);
  527.             } else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) {
  528.                 notifier = new RoundRobinListenerNotifier(this);
  529.                 LOGGER.config("[" + getName()
  530.                         + "] using round-robin listener notifier");
  531.             } else {
  532.                 throw new ConfigurationException("Unknown notifier type "
  533.                         + notifierType);
  534.             }
  535.         }
  536.     }

  537.     public void shutdown() throws Exception {
  538.         receiverCleanupTimer.cancel();
  539.         if (notificationCleanup != null) {
  540.             try {
  541.                 notificationCleanup.shutdown();
  542.                 notificationCleanup = null;
  543.             } catch (Exception ignore) {
  544.             }
  545.         }
  546.         try {
  547.             notifier.shutdown();
  548.         } catch (Exception ignore) {
  549.         }
  550.         try {
  551.             notificationIndex.shutdown();
  552.         } catch (Exception ignore) {
  553.         }
  554.         try {
  555.             productStorage.shutdown();
  556.         } catch (Exception ignore) {
  557.         }
  558.     }

  559.     public void startup() throws Exception {
  560.         if (productStorage == null) {
  561.             throw new ConfigurationException("[" + getName()
  562.                     + "] storage has not been configured properly");
  563.         }
  564.         if (notificationIndex == null) {
  565.             throw new ConfigurationException("[" + getName()
  566.                     + "] index has not been configured properly");
  567.         }
  568.         productStorage.startup();
  569.         notificationIndex.startup();

  570.         // only schedule cleanup if interval is non-zero
  571.         if (receiverCleanupInterval > 0) {
  572.             receiverCleanupTimer.scheduleAtFixedRate(new TimerTask() {
  573.                 public void run() {
  574.                     try {
  575.                         removeExpiredNotifications();
  576.                     } catch (Exception e) {
  577.                         LOGGER.log(Level.WARNING, "[" + getName()
  578.                                 + "] exception during receiver cleanup", e);
  579.                     }
  580.                 }
  581.             }, 0, receiverCleanupInterval);
  582.         }

  583.         // do this last since it may start processing
  584.         notifier.startup();

  585.         // ProductClient already started these listeners...
  586.         // Iterator<NotificationListener> iter = notificationListeners.keySet()
  587.         // .iterator();
  588.         // while (iter.hasNext()) {
  589.         // iter.next().startup();
  590.         // }
  591.     }

  592.     /**
  593.      * @return the notificationIndex
  594.      */
  595.     public NotificationIndex getNotificationIndex() {
  596.         return notificationIndex;
  597.     }

  598.     /**
  599.      * @param notificationIndex
  600.      *            the notificationIndex to set
  601.      */
  602.     public void setNotificationIndex(NotificationIndex notificationIndex) {
  603.         this.notificationIndex = notificationIndex;
  604.     }

  605.     /**
  606.      * @return the productStorage
  607.      */
  608.     public ProductStorage getProductStorage() {
  609.         return productStorage;
  610.     }

  611.     /**
  612.      * @param productStorage
  613.      *            the productStorage to set
  614.      */
  615.     public void setProductStorage(ProductStorage productStorage) {
  616.         this.productStorage = productStorage;
  617.     }

  618.     /**
  619.      * @return the productStorageMaxAge
  620.      */
  621.     public Long getProductStorageMaxAge() {
  622.         return productStorageMaxAge;
  623.     }

  624.     /**
  625.      * @param productStorageMaxAge
  626.      *            the productStorageMaxAge to set
  627.      */
  628.     public void setProductStorageMaxAge(Long productStorageMaxAge) {
  629.         this.productStorageMaxAge = productStorageMaxAge;
  630.     }

  631.     /**
  632.      * @return the QueueStatus or null if ExecutorListenerNotifier doesn't exist
  633.      */
  634.     public Map<String, Integer> getQueueStatus() {
  635.         if (notifier instanceof ExecutorListenerNotifier) {
  636.             return ((ExecutorListenerNotifier) notifier).getStatus();
  637.         }
  638.         return null;
  639.     }

  640.     /**
  641.      * Throttle notifier queues
  642.      * @throws InterruptedException InterruptedException
  643.      */
  644.     public void throttleQueues() throws InterruptedException {
  645.         if (notifier instanceof ExecutorListenerNotifier) {
  646.             ((ExecutorListenerNotifier) notifier).throttleQueues();
  647.         }
  648.     }

  649.     /**
  650.      * @return receiverCleanupInterval
  651.      */
  652.     public Long getReceiverCleanupInterval() {
  653.         return receiverCleanupInterval;
  654.     }

  655.     /**
  656.      * @param receiverCleanupInterval the receiverCleanupInterval to set
  657.      */
  658.     public void setReceiverCleanupInterval(Long receiverCleanupInterval) {
  659.         this.receiverCleanupInterval = receiverCleanupInterval;
  660.     }

  661.     /**
  662.      * @return connectionTimeout
  663.      */
  664.     public int getConnectTimeout() {
  665.         return connectTimeout;
  666.     }

  667.     /**
  668.      * @param connectTimeout int connectionTimeout to set
  669.      */
  670.     public void setConnectTimeout(int connectTimeout) {
  671.         this.connectTimeout = connectTimeout;
  672.     }

  673.     /**
  674.      * @return ListenerNotifier
  675.      */
  676.     public ListenerNotifier getNotifier() {
  677.         return this.notifier;
  678.     }

  679.     /**
  680.      * @param notifier ListenerNotifier to set
  681.      */
  682.     public void setNotifier(final ListenerNotifier notifier) {
  683.         this.notifier = notifier;
  684.     }

  685.     /**
  686.      * @return readTimeout
  687.      */
  688.     public int getReadTimeout() {
  689.         return readTimeout;
  690.     }

  691.     /**
  692.      * @param readTimeout int readTimeout to set
  693.      */
  694.     public void setReadTimeout(int readTimeout) {
  695.         this.readTimeout = readTimeout;
  696.     }

  697. }