SearchServerSocket.java
- package gov.usgs.earthquake.indexer;
- import java.io.BufferedInputStream;
- import java.io.BufferedOutputStream;
- import java.io.InputStream;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.concurrent.Executors;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import java.util.zip.DeflaterOutputStream;
- import java.util.zip.InflaterInputStream;
- import gov.usgs.earthquake.distribution.ConfigurationException;
- import gov.usgs.util.Config;
- import gov.usgs.util.DefaultConfigurable;
- import gov.usgs.util.SocketAcceptor;
- import gov.usgs.util.SocketListenerInterface;
- import gov.usgs.util.StreamUtils;
- /**
- * Server side of socket search interface.
- */
- public class SearchServerSocket extends DefaultConfigurable implements
- SocketListenerInterface {
- /** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(SearchServerSocket.class.getName());
- /** The configuration property used for listen port. */
- public static final String SEARCH_PORT_PROPERTY = "port";
- /** The default listen port, as a string. */
- public static final String DEFAULT_SEARCH_PORT = "11236";
- /** The configuration property used for listen thread count. */
- public static final String THREAD_POOL_SIZE_PROPERTY = "threads";
- /** The default number of threads, as a string. */
- public static final String DEFAULT_THREAD_POOL_SIZE = "10";
- /** The configuration property used to reference a ProductIndex. */
- public static final String PRODUCT_INDEXER_PROPERTY = "indexer";
- /** The configuration property used to reference a URLProductStorage. */
- public static final String PRODUCT_STORAGE_PROPERTY = "storage";
- /** The port to bind. */
- private int port = -1;
- /** The number of threads to use. */
- private int threads = -1;
- /** The server socket accept thread. */
- private SocketAcceptor acceptor;
- /** The indexer that will be searched. */
- private Indexer indexer;
- /**
- * Construct a new SearchServerSocket using defaults.
- */
- public SearchServerSocket() {
- this.port = Integer.parseInt(DEFAULT_SEARCH_PORT);
- this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
- }
- /**
- * Method to perform search.
- *
- * Calls Indexer.search(SearchRequest). Simplifies testing.
- *
- * @param request
- * the search to execute.
- * @return the search response.
- * @throws Exception if error occurs
- */
- protected SearchResponse search(final SearchRequest request)
- throws Exception {
- return indexer.search(request);
- }
- /**
- * This method is called each time a SearchSocket connects.
- */
- @Override
- public void onSocket(Socket socket) {
- LOGGER.info("[" + getName() + "] accepted search connection "
- + socket.toString());
- InputStream in = null;
- DeflaterOutputStream out = null;
- try {
- in = socket.getInputStream();
- in = new InflaterInputStream(new BufferedInputStream(
- new StreamUtils.UnclosableInputStream(in)));
- // read request
- SearchRequest request = SearchXML
- .parseRequest(new StreamUtils.UnclosableInputStream(in));
- // do search
- SearchResponse response = this.search(request);
- // send response
- out = new DeflaterOutputStream(new BufferedOutputStream(
- socket.getOutputStream()));
- SearchXML.toXML(response, new StreamUtils.UnclosableOutputStream(
- out));
- // finish compression
- out.finish();
- out.flush();
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception while processing search", ex);
- } finally {
- StreamUtils.closeStream(in);
- StreamUtils.closeStream(out);
- try {
- socket.shutdownInput();
- } catch (Exception e) {
- // ignore
- }
- try {
- socket.shutdownOutput();
- } catch (Exception e) {
- // ignore
- }
- try {
- socket.close();
- } catch (Exception e) {
- // ignore
- }
- }
- LOGGER.info("[" + getName() + "] closed search connection "
- + socket.toString());
- }
- @Override
- public void configure(Config config) throws Exception {
- port = Integer.parseInt(config.getProperty(SEARCH_PORT_PROPERTY,
- DEFAULT_SEARCH_PORT));
- LOGGER.config("[" + getName() + "] search port is " + port);
- threads = Integer.parseInt(config.getProperty(
- THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
- LOGGER.config("[" + getName() + "] number of threads is " + threads);
- String indexerName = config.getProperty(PRODUCT_INDEXER_PROPERTY);
- if (indexerName == null) {
- throw new ConfigurationException("[" + getName() + "] '"
- + PRODUCT_INDEXER_PROPERTY
- + "' is a required configuration property");
- }
- LOGGER.config("[" + getName() + "] loading indexer '" + indexerName
- + "'");
- indexer = (Indexer) Config.getConfig().getObject(indexerName);
- if (indexer == null) {
- throw new ConfigurationException("[" + getName() + "] indexer '"
- + indexerName + "' is not configured properly");
- }
- }
- @Override
- public void shutdown() throws Exception {
- // stop accepting connections
- acceptor.stop();
- acceptor = null;
- }
- @Override
- public void startup() throws Exception {
- ServerSocket socket = new ServerSocket(port);
- socket.setReuseAddress(true);
- acceptor = new SocketAcceptor(socket, this,
- Executors.newFixedThreadPool(threads));
- // start accepting connections via socket
- acceptor.start();
- }
- /** @return int port */
- public int getPort() {
- return port;
- }
- /** @param port int to set */
- public void setPort(int port) {
- this.port = port;
- }
- /** @return int threads */
- public int getThreads() {
- return threads;
- }
- /** @param threads into to set */
- public void setThreads(int threads) {
- this.threads = threads;
- }
- /** @return indexer */
- public Indexer getIndexer() {
- return indexer;
- }
- /** @param indexer to set */
- public void setIndex(Indexer indexer) {
- this.indexer = indexer;
- }
- }