SearchServerSocket.java

package gov.usgs.earthquake.indexer;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;
import gov.usgs.util.StreamUtils;

/**
 * Server side of socket search interface.
 */
public class SearchServerSocket extends DefaultConfigurable implements
		SocketListenerInterface {

	/** Logging object. */
	private static final Logger LOGGER = Logger
			.getLogger(SearchServerSocket.class.getName());

	/** The configuration property used for listen port. */
	public static final String SEARCH_PORT_PROPERTY = "port";

	/** The default listen port, as a string. */
	public static final String DEFAULT_SEARCH_PORT = "11236";

	/** The configuration property used for listen thread count. */
	public static final String THREAD_POOL_SIZE_PROPERTY = "threads";

	/** The default number of threads, as a string. */
	public static final String DEFAULT_THREAD_POOL_SIZE = "10";

	/** The configuration property used to reference a ProductIndex. */
	public static final String PRODUCT_INDEXER_PROPERTY = "indexer";

	/** The configuration property used to reference a URLProductStorage. */
	public static final String PRODUCT_STORAGE_PROPERTY = "storage";

	/** The port to bind. */
	private int port = -1;

	/** The number of threads to use. */
	private int threads = -1;

	/** The server socket accept thread. */
	private SocketAcceptor acceptor;

	/** The indexer that will be searched. */
	private Indexer indexer;

	/**
	 * Construct a new SearchServerSocket using defaults.
	 */
	public SearchServerSocket() {
		this.port = Integer.parseInt(DEFAULT_SEARCH_PORT);
		this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
	}

	/**
	 * Method to perform search.
	 *
	 * Calls Indexer.search(SearchRequest). Simplifies testing.
	 *
	 * @param request
	 *            the search to execute.
	 * @return the search response.
	 * @throws Exception if error occurs
	 */
	protected SearchResponse search(final SearchRequest request)
			throws Exception {
		return indexer.search(request);
	}

	/**
	 * This method is called each time a SearchSocket connects.
	 */
	@Override
	public void onSocket(Socket socket) {
		LOGGER.info("[" + getName() + "] accepted search connection "
				+ socket.toString());

		InputStream in = null;
		DeflaterOutputStream out = null;

		try {
			in = socket.getInputStream();
			in = new InflaterInputStream(new BufferedInputStream(
					new StreamUtils.UnclosableInputStream(in)));
			// read request
			SearchRequest request = SearchXML
					.parseRequest(new StreamUtils.UnclosableInputStream(in));

			// do search
			SearchResponse response = this.search(request);

			// send response
			out = new DeflaterOutputStream(new BufferedOutputStream(
					socket.getOutputStream()));
			SearchXML.toXML(response, new StreamUtils.UnclosableOutputStream(
					out));

			// finish compression
			out.finish();
			out.flush();
		} catch (Exception ex) {
			LOGGER.log(Level.WARNING, "[" + getName()
					+ "] exception while processing search", ex);
		} finally {
			StreamUtils.closeStream(in);
			StreamUtils.closeStream(out);

			try {
				socket.shutdownInput();
			} catch (Exception e) {
				// ignore
			}
			try {
				socket.shutdownOutput();
			} catch (Exception e) {
				// ignore
			}

			try {
				socket.close();
			} catch (Exception e) {
				// ignore
			}
		}

		LOGGER.info("[" + getName() + "] closed search connection "
				+ socket.toString());
	}

	@Override
	public void configure(Config config) throws Exception {
		port = Integer.parseInt(config.getProperty(SEARCH_PORT_PROPERTY,
				DEFAULT_SEARCH_PORT));
		LOGGER.config("[" + getName() + "] search port is " + port);

		threads = Integer.parseInt(config.getProperty(
				THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
		LOGGER.config("[" + getName() + "] number of threads is " + threads);

		String indexerName = config.getProperty(PRODUCT_INDEXER_PROPERTY);
		if (indexerName == null) {
			throw new ConfigurationException("[" + getName() + "] '"
					+ PRODUCT_INDEXER_PROPERTY
					+ "' is a required configuration property");
		}
		LOGGER.config("[" + getName() + "] loading indexer '" + indexerName
				+ "'");
		indexer = (Indexer) Config.getConfig().getObject(indexerName);
		if (indexer == null) {
			throw new ConfigurationException("[" + getName() + "] indexer '"
					+ indexerName + "' is not configured properly");
		}
	}

	@Override
	public void shutdown() throws Exception {
		// stop accepting connections
		acceptor.stop();
		acceptor = null;
	}

	@Override
	public void startup() throws Exception {
		ServerSocket socket = new ServerSocket(port);
		socket.setReuseAddress(true);
		acceptor = new SocketAcceptor(socket, this,
				Executors.newFixedThreadPool(threads));
		// start accepting connections via socket
		acceptor.start();
	}

	/** @return int port */
	public int getPort() {
		return port;
	}

	/** @param port int to set */
	public void setPort(int port) {
		this.port = port;
	}

	/** @return int threads */
	public int getThreads() {
		return threads;
	}

	/** @param threads into to set */
	public void setThreads(int threads) {
		this.threads = threads;
	}

	/** @return indexer */
	public Indexer getIndexer() {
		return indexer;
	}

	/** @param indexer to set */
	public void setIndex(Indexer indexer) {
		this.indexer = indexer;
	}

}