DefaultNotificationReceiver.java

/*
 * DefaultNotificationReceiver
 */
package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.aws.JsonNotification;
import gov.usgs.earthquake.distribution.roundrobinnotifier.RoundRobinListenerNotifier;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.IOUtil;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.SizeLimitInputStream;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.ObjectLock;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Logger;

import javax.json.Json;

import java.util.logging.Level;

/**
 * The core of product distribution.
 *
 * A DefaultNotificationReceiver receives notifications and notifies listeners
 * of received notifications. NotificationListeners use the NotificationReceiver
 * to retrieve products referenced by notifications.
 *
 * The NotificationReceiver uses a NotificationIndex to track received
 * notifications, and a ProductStorage to store retrieved products.
 *
 * The DefaultNotificationReceiver implements the Configurable interface and
 * uses the following configuration parameters:
 *
 * Each listener has a separate queue of notifications. Each listener is
 * allocated one thread to process notifications from this queue.
 */
public class DefaultNotificationReceiver extends DefaultConfigurable implements
		NotificationReceiver, NotificationIndexCleanup.Listener {

	/** Logging object. */
	private static final Logger LOGGER = Logger
			.getLogger(DefaultNotificationReceiver.class.getName());

	/** Property referencing a notification index config section. */
	public static final String NOTIFICATION_INDEX_PROPERTY = "index";

	/** Shortcut to create a SQLite JDBCNotificationIndex. */
	public static final String INDEX_FILE_PROPERTY = "indexFile";

	/** Property referencing a product storage config section. */
	public static final String PRODUCT_STORAGE_PROPERTY = "storage";

	/** Shortcut to create a FileProductStorage. */
	public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

	/** Property referencing how long to store products in milliseconds. */
	public static final String PRODUCT_STORAGE_MAX_AGE_PROPERTY = "storageAge";

	/** Default max age to store products, 3600000 milliseconds = 1 hour. */
	public static final String DEFAULT_PRODUCT_STORAGE_MAX_AGE = "3600000";

	/**
	 * Property referencing how long to wait until checking for expired
	 * notifications/products.
	 */
	public static final String RECEIVER_CLEANUP_PROPERTY = "cleanupInterval";

	/**
	 * Default time between checking for expired notifications/products, 900000
	 * milliseconds = 15 minutes.
	 */
	public static final String DEFAULT_RECEIVER_CLEANUP = "900000";

	/** Property for connection Timeout */
	public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
	/** Default connection timeout. 15 seconds */
	public static final String DEFAULT_CONNECT_TIMEOUT = "15000";
	/** Property for read timeout */
	public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
	/** default read timeout. 15 seconds */
	public static final String DEFAULT_READ_TIMEOUT = "15000";

	/** Property for listener notifier */
	public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier";
	/** Property for listener notifier to set to executor*/
	public static final String EXECUTOR_LISTENER_NOTIFIER = "executor";
	/** Property to listener notifier to set to future */
	public static final String FUTURE_LISTENER_NOTIFIER = "future";
	/** Property to listener notifier to set to roundrobin */
	public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin";

	/** The notification index where received notifications are stored. */
	private NotificationIndex notificationIndex;

	/** The product storage where retrieved products are stored. */
	private ProductStorage productStorage;

	/** How long to store retrieved product, in milliseconds. */
	private Long productStorageMaxAge = 0L;

	/** How long to wait until checking for expired notifications/products. */
	private Long receiverCleanupInterval = 0L;

	/** Timer that schedules receiver cleanup task. */
	private Timer receiverCleanupTimer = new Timer();

	/** Notification cleanup */
	private NotificationIndexCleanup notificationCleanup = null;

	private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT);
	private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT);

	private ListenerNotifier notifier;

	/** A lock that is acquired when a product is being retrieved. */
	private ObjectLock<ProductId> retrieveLocks = new ObjectLock<ProductId>();

	/** Creates new ExecutorListenerNotifier to var notifier */
	public DefaultNotificationReceiver() {
		notifier = new ExecutorListenerNotifier(this);
	}

	/**
	 * Add a new notification listener.
	 *
	 * @param listener
	 *            the listener to add. When notifications are received, this
	 *            listener will be notified.
	 * @throws Exception exception
	 */
	public void addNotificationListener(NotificationListener listener)
			throws Exception {
		notifier.addNotificationListener(listener);
	}

	/**
	 * Remove an existing notification listener.
	 *
	 * Any currently queued notifications are processed before shutting down.
	 *
	 * @param listener
	 *            the listener to remove. When notifications are receive, this
	 *            listener will no longer be notified.
	 * @throws Exception exception
	 */
	public void removeNotificationListener(NotificationListener listener)
			throws Exception {
		notifier.removeNotificationListener(listener);
	}

	/**
	 * Store a notification and notify listeners.
	 *
	 * Updates the notification index before notifying listeners of the newly
	 * available product.
	 *
	 * @param notification
	 *            the notification being received.
	 * @throws Exception
	 *             if the notificationIndex throws an Exception.
	 */
	public void receiveNotification(Notification notification) throws Exception {
		// notification processed
		new ProductTracker(notification.getTrackerURL()).notificationReceived(
				this.getName(), notification);

		if (notification.getExpirationDate().before(new Date())) {
			LOGGER.finer("[" + getName()
					+ "] skipping already expired notification for product id="
					+ notification.getProductId().toString() + ", expiration="
					+ notification.getExpirationDate().toString());
		} else {
			// add notification to index
			notificationIndex.addNotification(notification);

			if (notification instanceof JsonNotification) {
				LOGGER.finer("[" + getName() + "] json notification " +
						notification.getProductId());
			} else if (notification instanceof URLNotification) {
				LOGGER.finer("["
						+ getName()
						+ "] notification URL="
						+ ((URLNotification) notification).getProductURL()
								.toString());
			}

			notifyListeners(notification);
		}
	}

	/**
	 * Send a notification to all registered NotificationListeners.
	 *
	 * Creates a NotificationEvent, with a reference to this object and calls
	 * each notificationListeners onNotification method in separate threads.
	 *
	 * This method usually returns before registered NotificationListeners have
	 * completed processing a notification.
	 *
	 * @param notification
	 *            the notification being sent to listeners.
	 * @throws Exception exception
	 */
	protected void notifyListeners(final Notification notification)
			throws Exception {

		LOGGER.finest("[" + getName() + "] notifying listeners for product id="
				+ notification.getProductId().toString());

		// queue notification for listeners
		NotificationEvent event = new NotificationEvent(this, notification);
		notifier.notifyListeners(event);
	}
	/** @return "Using notifier" */
	public String getListenerQueueStatus() {
		return "Using notifier";
	}

	/**
	 * Search the notification index for expired notifications, removing any
	 * that are found. When a notification in the index is not a
	 * URLNotification, it represents a product in storage that will also be
	 * removed.
	 *
	 * @throws Exception
	 *             if NotificationIndexCleanup throws an Exception.
	 */
	public void removeExpiredNotifications() throws Exception {
		LOGGER.fine("[" + getName() + "] running receiver cleanup");
		// use NotificationIndexCleanup to manage cleanup in separate thread
		if (this.notificationCleanup == null) {
			this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
			this.notificationCleanup.startup();
		} else {
			this.notificationCleanup.wakeUp();
		}
	}

	/**
	 * Callback from the NotificationIndexCleanup thread.
	 *
	 * Checks if Notification refers to a product in storage,
	 * which should also be removed.
	 *
	 * @param notification
	 *     expired notification about to be removed.
	 * @throws Exception
	 */
	public void onExpiredNotification(final Notification notification) throws Exception {
		if (!(notification instanceof URLNotification)) {
			// if it isn't a url notification, it's also in storage
			productStorage.removeProduct(notification.getProductId());
			if (LOGGER.isLoggable(Level.FINEST)) {
				LOGGER.finest("[" + getName()
						+ "] removed expired product from receiver cache "
						+ notification.getProductId().toString());
			}
		}
	}

	/**
	 * Retrieve a product by id.
	 *
	 * If this product is already in storage, load and return the product.
	 * Otherwise, search notifications for this product, and download the
	 * product into storage.
	 *
	 * @param id
	 *            the product to retrieve
	 * @return the retrieved product, or null if not available.
	 * @throws Exception exception
	 */
	public Product retrieveProduct(ProductId id) throws Exception {
		Product product = null;
		String productIdString = id.toString();

		LOGGER.finest("[" + getName() + "] acquiring retrieve lock id="
				+ productIdString);
		retrieveLocks.acquireLock(id);
		LOGGER.finest("[" + getName() + "] retrieve lock acquired id="
				+ productIdString);
		try {
			if (productStorage.hasProduct(id)) {
				try {
					LOGGER.finest("[" + getName() + "] storing product id="
							+ productIdString);
					product = productStorage.getProduct(id);
					LOGGER.finest("[" + getName() + "] product stored id="
							+ productIdString);
				} catch (Exception e) {
					LOGGER.log(
							Level.FINE,
							"["
									+ getName()
									+ "] storage claims hasProduct, but threw exception",
							e);
					product = null;
				}
			}

			if (product == null) {
				LOGGER.finer("[" + getName()
						+ "] don't have product yet, searching notifications");
				// don't have product yet, search notifications
				Iterator<Notification> iter = notificationIndex
						.findNotifications(id).iterator();
				while (product == null && iter.hasNext()) {
					Notification notification = iter.next();
					if (!(notification instanceof URLNotification)) {
						// only URL notifications include location info
						continue;
					}

					InputStream in = null;
					try {
						URL productURL = ((URLNotification) notification)
								.getProductURL();

						ProductSource productSource = null;
						SizeLimitInputStream sizeIn = null;

						final Date beginConnect = new Date();
						Date beginDownload = new Date();
						if (productURL.getProtocol().equals("data")) {
							product = new JsonProduct().getProduct(Json.createReader(
									StreamUtils.getInputStream(productURL)).readObject());
							// JSON notification with embedded product
							LOGGER.finer("[" + getName() + "] parsed json notification for "
									+ product.getId().toString());
							productSource = new ObjectProductSource(product);
						} else {
							// URL notification
							LOGGER.finer("[" + getName() + "] notification url "
									+ productURL.toString());

							in = StreamUtils.getURLInputStream(productURL,
									connectTimeout, readTimeout);
							beginDownload = new Date();
							// use size limit with negative limit to count transfer size
							sizeIn = new SizeLimitInputStream(in, -1);
							productSource = IOUtil.autoDetectProductSource(sizeIn);
						}

						Notification storedNotification = storeProductSource(productSource);

						final Date endDownload = new Date();
						final long connectTime = beginDownload.getTime() - beginConnect.getTime();
						final long downloadTime = endDownload.getTime() - beginDownload.getTime();
						final long downloadSize = sizeIn != null ? sizeIn.getRead() : 0;
						final long downloadRate = Math.round(downloadSize /
								(Math.max(downloadTime, 1L) / 1000.0));

						LOGGER.fine("[" + getName() + "] receiver retrieved product"
								+ " id=" + id.toString()
								+ " (connect = " + connectTime + " ms)"
								+ " (rate = " + downloadRate +  " bytes/s)"
								+ " (size = " + downloadSize + " bytes)"
								+ " (time = " + downloadTime + " ms)"
								+ " from "
								+ (productURL.getProtocol().equals("data")
										? "data url"
										: productURL.toString()));

						LOGGER.finest("[" + getName()
								+ "] after store product, notification="
								+ storedNotification);

						if (productStorage.hasProduct(id)) {
							LOGGER.finer("[" + getName()
									+ "] getting product from storage");
							product = productStorage.getProduct(id);
							LOGGER.finest("[" + getName()
									+ "] after getProduct, product=" + product);

							try {
								new ProductTracker(notification.getTrackerURL())
										.productDownloaded(this.getName(), id);
								LOGGER.fine("[" + getName()
										+ "] product downloaded from "
										+ (productURL.getProtocol().equals("data")
												? "data url"
												: productURL.toString()));
							} catch (Exception e) {
								LOGGER.log(
										Level.WARNING,
										"["
												+ getName()
												+ "] exception notifying tracker about downloaded product",
										e);
							}
						} else {
							LOGGER.finer("[" + getName()
									+ "] product not in storage id="
									+ productIdString);
						}
					} catch (Exception e) {
						if (e instanceof ProductAlreadyInStorageException
								|| e.getCause() instanceof ProductAlreadyInStorageException) {
							LOGGER.finer("[" + getName()
									+ "] product already in storage id="
									+ productIdString);
							product = productStorage.getProduct(id);
							continue;
						}

						// log any exception that happened while retrieving
						// product
						if (e instanceof FileNotFoundException) {
							LOGGER.warning("["
									+ getName()
									+ "] exception while retrieving product, file not found");
						} else {
							LOGGER.log(Level.WARNING, "[" + getName()
									+ "] exception while retrieving product", e);
							new ProductTracker(notification.getTrackerURL())
									.exception(this.getName(), id, e);
						}
					} finally {
						StreamUtils.closeStream(in);
					}
				}
			}
		} finally {
			LOGGER.finest("[" + getName() + "] releasing retrieve lock id="
					+ productIdString);
			retrieveLocks.releaseLock(id);
			LOGGER.finest("[" + getName() + "] retrieve lock released id="
					+ productIdString);
		}

		// return product
		return product;
	}

	/**
	 * Calls the current <code>ProductStorage.storeProductSource</code> method.
	 *
	 * @param source
	 *            The <code>ProductSource</code> to store.
	 * @return The <code>ProductId</code> of the product referenced by the given
	 *         <code>ProductSource</code>.
	 * @throws Exception exception
	 * @see gov.usgs.earthquake.distribution.ProductStorage
	 */
	protected Notification storeProductSource(ProductSource source)
			throws Exception {
		Notification notification = null;

		// store product input
		ProductId id = productStorage.storeProductSource(source);

		// check if stored
		if (productStorage.hasProduct(id)) {
			Product product = productStorage.getProduct(id);

			// calculate storage expiration date
			Date expirationDate = new Date(new Date().getTime()
					+ productStorageMaxAge);

			// update notification index
			notification = new DefaultNotification(id, expirationDate,
					product.getTrackerURL());
			notificationIndex.addNotification(notification);
		}

		return notification;
	}

	/**
	 * Send matching notifications to listener.
	 *
	 * Searches the NotificationIndex for matching notifications, and sends a
	 * NotificationEvent for each notification found.
	 *
	 * @param listener
	 *            the listener to receive a NotificationEvent for each found
	 *            notification.
	 * @param sources
	 *            sources to include, or null for all.
	 * @param types
	 *            types to include, or null for all.
	 * @param codes
	 *            codes to include, or null for all.
	 * @throws Exception
	 *             if the notification index or notification listener throw an
	 *             exception.
	 */
	public void sendNotifications(NotificationListener listener,
			List<String> sources, List<String> types, List<String> codes)
			throws Exception {
		List<Notification> notifications = notificationIndex.findNotifications(
				sources, types, codes);
		Iterator<Notification> iter = notifications.iterator();
		while (iter.hasNext()) {
			listener.onNotification(new NotificationEvent(this, iter.next()));
		}
	}

	public void configure(Config config) throws Exception {
		String notificationIndexName = config
				.getProperty(NOTIFICATION_INDEX_PROPERTY);
		String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
		if (notificationIndexName == null && notificationIndexFile == null) {
			throw new ConfigurationException("[" + getName()
					+ "] 'index' is a required configuration property");
		}
		if (notificationIndexName != null) {
			LOGGER.config("[" + getName() + "] loading notification index '"
					+ notificationIndexName + "'");
			notificationIndex = (NotificationIndex) Config.getConfig()
					.getObject(notificationIndexName);
			if (notificationIndex == null) {
				throw new ConfigurationException("[" + getName() + "] index '"
						+ notificationIndexName
						+ "' is not properly configured");
			}
		} else {
			LOGGER.config("[" + getName() + "] using notification index '"
					+ notificationIndexFile + "'");
			notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
		}

		String productStorageName = config
				.getProperty(PRODUCT_STORAGE_PROPERTY);
		String storageDirectory = config
				.getProperty(STORAGE_DIRECTORY_PROPERTY);
		if (productStorageName == null && storageDirectory == null) {
			throw new ConfigurationException("[" + getName()
					+ "] 'storage' is a required configuration property");
		}
		if (productStorageName != null) {
			LOGGER.config("[" + getName() + "] loading product storage '"
					+ productStorageName + "'");
			productStorage = (ProductStorage) Config.getConfig().getObject(
					productStorageName);
			if (productStorage == null) {
				throw new ConfigurationException("[" + getName()
						+ "] storage '" + productStorageName
						+ "' is not properly configured");
			}
		} else {
			LOGGER.config("[" + getName() + "] using storage directory '"
					+ storageDirectory + "'");
			productStorage = new FileProductStorage(new File(storageDirectory));
		}

		productStorageMaxAge = Long.parseLong(config.getProperty(
				PRODUCT_STORAGE_MAX_AGE_PROPERTY,
				// previously all lower-case
				config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY.toLowerCase(),
						DEFAULT_PRODUCT_STORAGE_MAX_AGE)));
		LOGGER.config("[" + getName() + "] storage max age "
				+ productStorageMaxAge + " ms");

		receiverCleanupInterval = Long.parseLong(config.getProperty(
				RECEIVER_CLEANUP_PROPERTY, DEFAULT_RECEIVER_CLEANUP));
		LOGGER.config("[" + getName() + "] receiver cleanup interval "
				+ receiverCleanupInterval + " ms");

		connectTimeout = Integer.parseInt(config.getProperty(
				CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
		LOGGER.config("[" + getName() + "] receiver connect timeout "
				+ connectTimeout + " ms");

		readTimeout = Integer.parseInt(config.getProperty(
				READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
		LOGGER.config("[" + getName() + "] receiver read timeout "
				+ readTimeout + " ms");

		String notifierType = config.getProperty(LISTENER_NOTIFIER_PROPERTY);
		if (notifierType != null) {
			if (notifierType.equals(EXECUTOR_LISTENER_NOTIFIER)) {
				notifier = new ExecutorListenerNotifier(this);
				LOGGER.config("[" + getName()
						+ "] using executor listener notifier");
			} else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) {
				notifier = new FutureListenerNotifier(this);
			} else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) {
				notifier = new RoundRobinListenerNotifier(this);
				LOGGER.config("[" + getName()
						+ "] using round-robin listener notifier");
			} else {
				throw new ConfigurationException("Unknown notifier type "
						+ notifierType);
			}
		}
	}

	public void shutdown() throws Exception {
		receiverCleanupTimer.cancel();
		if (notificationCleanup != null) {
			try {
				notificationCleanup.shutdown();
				notificationCleanup = null;
			} catch (Exception ignore) {
			}
		}
		try {
			notifier.shutdown();
		} catch (Exception ignore) {
		}
		try {
			notificationIndex.shutdown();
		} catch (Exception ignore) {
		}
		try {
			productStorage.shutdown();
		} catch (Exception ignore) {
		}
	}

	public void startup() throws Exception {
		if (productStorage == null) {
			throw new ConfigurationException("[" + getName()
					+ "] storage has not been configured properly");
		}
		if (notificationIndex == null) {
			throw new ConfigurationException("[" + getName()
					+ "] index has not been configured properly");
		}
		productStorage.startup();
		notificationIndex.startup();

		// only schedule cleanup if interval is non-zero
		if (receiverCleanupInterval > 0) {
			receiverCleanupTimer.scheduleAtFixedRate(new TimerTask() {
				public void run() {
					try {
						removeExpiredNotifications();
					} catch (Exception e) {
						LOGGER.log(Level.WARNING, "[" + getName()
								+ "] exception during receiver cleanup", e);
					}
				}
			}, 0, receiverCleanupInterval);
		}

		// do this last since it may start processing
		notifier.startup();

		// ProductClient already started these listeners...
		// Iterator<NotificationListener> iter = notificationListeners.keySet()
		// .iterator();
		// while (iter.hasNext()) {
		// iter.next().startup();
		// }
	}

	/**
	 * @return the notificationIndex
	 */
	public NotificationIndex getNotificationIndex() {
		return notificationIndex;
	}

	/**
	 * @param notificationIndex
	 *            the notificationIndex to set
	 */
	public void setNotificationIndex(NotificationIndex notificationIndex) {
		this.notificationIndex = notificationIndex;
	}

	/**
	 * @return the productStorage
	 */
	public ProductStorage getProductStorage() {
		return productStorage;
	}

	/**
	 * @param productStorage
	 *            the productStorage to set
	 */
	public void setProductStorage(ProductStorage productStorage) {
		this.productStorage = productStorage;
	}

	/**
	 * @return the productStorageMaxAge
	 */
	public Long getProductStorageMaxAge() {
		return productStorageMaxAge;
	}

	/**
	 * @param productStorageMaxAge
	 *            the productStorageMaxAge to set
	 */
	public void setProductStorageMaxAge(Long productStorageMaxAge) {
		this.productStorageMaxAge = productStorageMaxAge;
	}

	/**
	 * @return the QueueStatus or null if ExecutorListenerNotifier doesn't exist
	 */
	public Map<String, Integer> getQueueStatus() {
		if (notifier instanceof ExecutorListenerNotifier) {
			return ((ExecutorListenerNotifier) notifier).getStatus();
		}
		return null;
	}

	/**
	 * Throttle notifier queues
	 * @throws InterruptedException InterruptedException
	 */
	public void throttleQueues() throws InterruptedException {
		if (notifier instanceof ExecutorListenerNotifier) {
			((ExecutorListenerNotifier) notifier).throttleQueues();
		}
	}

	/**
	 * @return receiverCleanupInterval
	 */
	public Long getReceiverCleanupInterval() {
		return receiverCleanupInterval;
	}

	/**
	 * @param receiverCleanupInterval the receiverCleanupInterval to set
	 */
	public void setReceiverCleanupInterval(Long receiverCleanupInterval) {
		this.receiverCleanupInterval = receiverCleanupInterval;
	}

	/**
	 * @return connectionTimeout
	 */
	public int getConnectTimeout() {
		return connectTimeout;
	}

	/**
	 * @param connectTimeout int connectionTimeout to set
	 */
	public void setConnectTimeout(int connectTimeout) {
		this.connectTimeout = connectTimeout;
	}

	/**
	 * @return ListenerNotifier
	 */
	public ListenerNotifier getNotifier() {
		return this.notifier;
	}

	/**
	 * @param notifier ListenerNotifier to set
	 */
	public void setNotifier(final ListenerNotifier notifier) {
		this.notifier = notifier;
	}

	/**
	 * @return readTimeout
	 */
	public int getReadTimeout() {
		return readTimeout;
	}

	/**
	 * @param readTimeout int readTimeout to set
	 */
	public void setReadTimeout(int readTimeout) {
		this.readTimeout = readTimeout;
	}

}