SocketProductReceiver.java

/*
 * SocketProductReceiver
 */
package gov.usgs.earthquake.distribution;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.util.Config;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;

/**
 * Receive Products directly via a Socket.
 *
 * The received products are sent using a SocketProductSender.
 *
 * A SocketProductReceiver receives products directly and notifies listeners of
 * received notifications.
 *
 * These are typically used on hubs with an EIDSNotificationSender or
 * RelayProductReceiver.
 *
 * The NotificationReceiver uses a NotificationIndex to track received
 * notifications, and a ProductStorage to store retrieved products.
 *
 * The DefaultNotificationReceiver implements the Configurable interface and
 * uses the following configuration parameters:
 *
 * Each listener has a separate queue of notifications. Each listener is
 * allocated one thread to process notifications from this queue.
 */
public class SocketProductReceiver extends DefaultNotificationReceiver
		implements SocketListenerInterface {

	private static final String THREAD_POOL_SIZE_PROPERTY = "threads";

	private static final String DEFAULT_THREAD_POOL_SIZE = "10";

	private static final String PRODUCT_PORT_PROPERTY = "port";

	private static final String DEFAULT_PRODUCT_PORT = "11235";

	private static final String SIZE_LIMIT_PROPERTY = "sizeLimit";

	private static final String DEFAULT_SIZE_LIMIT = "-1";


	private static final Logger LOGGER = Logger
			.getLogger(SocketProductReceiver.class.getName());

	private int port = -1;
	private int threads = -1;
	private long sizeLimit = -1;

	private SocketAcceptor acceptor = null;

	/**
	 * Default constructor setting port, threads, and sizeLimit to default
	 * @throws Exception if error occurs
	 */
	public SocketProductReceiver() throws Exception {
		super();
		this.port = Integer.parseInt(DEFAULT_PRODUCT_PORT);
		this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
		this.sizeLimit = Long.parseLong(DEFAULT_SIZE_LIMIT);
	}

	/**
	 * Constructor based on config file
	 * @param config Configuration file
	 * @throws Exception if error occurs
	 */
	public SocketProductReceiver(Config config) throws Exception {
		this();
		configure(config);
	}

	public void configure(Config config) throws Exception {
		super.configure(config);
		this.port = Integer.parseInt(config.getProperty(PRODUCT_PORT_PROPERTY,
				DEFAULT_PRODUCT_PORT));
		LOGGER.config("[" + getName() + "]  port is '" + this.port + "'");

		this.threads = Integer.parseInt(config.getProperty(
				THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
		LOGGER.config("[" + getName() + "]  number of threads is '"
				+ this.threads + "'");

		this.sizeLimit = Long.parseLong(config.getProperty(
				SIZE_LIMIT_PROPERTY, DEFAULT_SIZE_LIMIT));
		LOGGER.config("[" + getName() + "] size limite is '"
				+ this.sizeLimit + "'");
	}

	public void startup() throws Exception {
		// call DefaultNotificationReceiver startup first
		super.startup();

		ServerSocket socket = new ServerSocket(port);
		socket.setReuseAddress(true);
		acceptor = new SocketAcceptor(socket, this,
				Executors.newFixedThreadPool(threads));
		// start accepting connections via socket
		acceptor.start();
	}

	public void shutdown() throws Exception {
		// stop accepting connections
		acceptor.stop();

		// call DefaultNotificationReceiver shutdown last
		super.shutdown();
	}

	public void onSocket(Socket socket) {
		LOGGER.info("[" + getName() + "] accepted connection "
				+ socket.toString());

		try {
			new SocketProductReceiverHandler(this, socket).run();
		} catch (Exception e) {
			LOGGER.log(Level.WARNING,
					"[" + getName() + "] uncaught exception processing "
					+ socket.toString(), e);
		} finally {
			try {
				socket.shutdownInput();
			} catch (Exception e) {
				// ignore
			}
			try {
				socket.shutdownOutput();
			} catch (Exception e) {
				// ignore
			}

			try {
				socket.close();
			} catch (Exception e) {
				// ignore
			}
		}

		LOGGER.info("[" + getName() + "] closed connection "
				+ socket.toString());
	}

	/**
	 * Stores ProductSource as a notification, tracks it, and notifies Listeners
	 * @param source ProductSource
	 * @return String note for log file
	 * @throws Exception if error occurs
	 */
	protected String storeAndNotify(final ProductSource source)
			throws Exception {
		Notification notification = storeProductSource(source);
		if (notification != null) {
			// note in log file
			String message = "[" + getName() + "] received product '"
					+ notification.getProductId().toString() + "'\n";

			// product successfully stored, and notification index updated.
			new ProductTracker(notification.getTrackerURL()).productReceived(
					this.getName(), notification.getProductId());

			// notify listeners of newly available product.
			notifyListeners(notification);

			return message;
		} else {
			throw new Exception("[" + getName()
					+ "] unknown error, no notification generated");
		}
	}

	/** @return port */
	public int getPort() {
		return port;
	}

	/** @param port int to set */
	public void setPort(int port) {
		this.port = port;
	}

	/** @return sizeLimit */
	public long getSizeLimit() {
		return sizeLimit;
	}

	/** @param sizeLimit long to set */
	public void setSizeLimit(long sizeLimit) {
		this.sizeLimit = sizeLimit;
	}

	/** @return threads */
	public int getThreads() {
		return threads;
	}

	/** @param threads int to set */
	public void setThreads(int threads) {
		this.threads = threads;
	}

}