SearchServerSocket.java

  1. package gov.usgs.earthquake.indexer;

  2. import java.io.BufferedInputStream;
  3. import java.io.BufferedOutputStream;
  4. import java.io.InputStream;
  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. import java.util.concurrent.Executors;
  8. import java.util.logging.Level;
  9. import java.util.logging.Logger;
  10. import java.util.zip.DeflaterOutputStream;
  11. import java.util.zip.InflaterInputStream;

  12. import gov.usgs.earthquake.distribution.ConfigurationException;
  13. import gov.usgs.util.Config;
  14. import gov.usgs.util.DefaultConfigurable;
  15. import gov.usgs.util.SocketAcceptor;
  16. import gov.usgs.util.SocketListenerInterface;
  17. import gov.usgs.util.StreamUtils;

  18. /**
  19.  * Server side of socket search interface.
  20.  */
  21. public class SearchServerSocket extends DefaultConfigurable implements
  22.         SocketListenerInterface {

  23.     /** Logging object. */
  24.     private static final Logger LOGGER = Logger
  25.             .getLogger(SearchServerSocket.class.getName());

  26.     /** The configuration property used for listen port. */
  27.     public static final String SEARCH_PORT_PROPERTY = "port";

  28.     /** The default listen port, as a string. */
  29.     public static final String DEFAULT_SEARCH_PORT = "11236";

  30.     /** The configuration property used for listen thread count. */
  31.     public static final String THREAD_POOL_SIZE_PROPERTY = "threads";

  32.     /** The default number of threads, as a string. */
  33.     public static final String DEFAULT_THREAD_POOL_SIZE = "10";

  34.     /** The configuration property used to reference a ProductIndex. */
  35.     public static final String PRODUCT_INDEXER_PROPERTY = "indexer";

  36.     /** The configuration property used to reference a URLProductStorage. */
  37.     public static final String PRODUCT_STORAGE_PROPERTY = "storage";

  38.     /** The port to bind. */
  39.     private int port = -1;

  40.     /** The number of threads to use. */
  41.     private int threads = -1;

  42.     /** The server socket accept thread. */
  43.     private SocketAcceptor acceptor;

  44.     /** The indexer that will be searched. */
  45.     private Indexer indexer;

  46.     /**
  47.      * Construct a new SearchServerSocket using defaults.
  48.      */
  49.     public SearchServerSocket() {
  50.         this.port = Integer.parseInt(DEFAULT_SEARCH_PORT);
  51.         this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
  52.     }

  53.     /**
  54.      * Method to perform search.
  55.      *
  56.      * Calls Indexer.search(SearchRequest). Simplifies testing.
  57.      *
  58.      * @param request
  59.      *            the search to execute.
  60.      * @return the search response.
  61.      * @throws Exception if error occurs
  62.      */
  63.     protected SearchResponse search(final SearchRequest request)
  64.             throws Exception {
  65.         return indexer.search(request);
  66.     }

  67.     /**
  68.      * This method is called each time a SearchSocket connects.
  69.      */
  70.     @Override
  71.     public void onSocket(Socket socket) {
  72.         LOGGER.info("[" + getName() + "] accepted search connection "
  73.                 + socket.toString());

  74.         InputStream in = null;
  75.         DeflaterOutputStream out = null;

  76.         try {
  77.             in = socket.getInputStream();
  78.             in = new InflaterInputStream(new BufferedInputStream(
  79.                     new StreamUtils.UnclosableInputStream(in)));
  80.             // read request
  81.             SearchRequest request = SearchXML
  82.                     .parseRequest(new StreamUtils.UnclosableInputStream(in));

  83.             // do search
  84.             SearchResponse response = this.search(request);

  85.             // send response
  86.             out = new DeflaterOutputStream(new BufferedOutputStream(
  87.                     socket.getOutputStream()));
  88.             SearchXML.toXML(response, new StreamUtils.UnclosableOutputStream(
  89.                     out));

  90.             // finish compression
  91.             out.finish();
  92.             out.flush();
  93.         } catch (Exception ex) {
  94.             LOGGER.log(Level.WARNING, "[" + getName()
  95.                     + "] exception while processing search", ex);
  96.         } finally {
  97.             StreamUtils.closeStream(in);
  98.             StreamUtils.closeStream(out);

  99.             try {
  100.                 socket.shutdownInput();
  101.             } catch (Exception e) {
  102.                 // ignore
  103.             }
  104.             try {
  105.                 socket.shutdownOutput();
  106.             } catch (Exception e) {
  107.                 // ignore
  108.             }

  109.             try {
  110.                 socket.close();
  111.             } catch (Exception e) {
  112.                 // ignore
  113.             }
  114.         }

  115.         LOGGER.info("[" + getName() + "] closed search connection "
  116.                 + socket.toString());
  117.     }

  118.     @Override
  119.     public void configure(Config config) throws Exception {
  120.         port = Integer.parseInt(config.getProperty(SEARCH_PORT_PROPERTY,
  121.                 DEFAULT_SEARCH_PORT));
  122.         LOGGER.config("[" + getName() + "] search port is " + port);

  123.         threads = Integer.parseInt(config.getProperty(
  124.                 THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
  125.         LOGGER.config("[" + getName() + "] number of threads is " + threads);

  126.         String indexerName = config.getProperty(PRODUCT_INDEXER_PROPERTY);
  127.         if (indexerName == null) {
  128.             throw new ConfigurationException("[" + getName() + "] '"
  129.                     + PRODUCT_INDEXER_PROPERTY
  130.                     + "' is a required configuration property");
  131.         }
  132.         LOGGER.config("[" + getName() + "] loading indexer '" + indexerName
  133.                 + "'");
  134.         indexer = (Indexer) Config.getConfig().getObject(indexerName);
  135.         if (indexer == null) {
  136.             throw new ConfigurationException("[" + getName() + "] indexer '"
  137.                     + indexerName + "' is not configured properly");
  138.         }
  139.     }

  140.     @Override
  141.     public void shutdown() throws Exception {
  142.         // stop accepting connections
  143.         acceptor.stop();
  144.         acceptor = null;
  145.     }

  146.     @Override
  147.     public void startup() throws Exception {
  148.         ServerSocket socket = new ServerSocket(port);
  149.         socket.setReuseAddress(true);
  150.         acceptor = new SocketAcceptor(socket, this,
  151.                 Executors.newFixedThreadPool(threads));
  152.         // start accepting connections via socket
  153.         acceptor.start();
  154.     }

  155.     /** @return int port */
  156.     public int getPort() {
  157.         return port;
  158.     }

  159.     /** @param port int to set */
  160.     public void setPort(int port) {
  161.         this.port = port;
  162.     }

  163.     /** @return int threads */
  164.     public int getThreads() {
  165.         return threads;
  166.     }

  167.     /** @param threads into to set */
  168.     public void setThreads(int threads) {
  169.         this.threads = threads;
  170.     }

  171.     /** @return indexer */
  172.     public Indexer getIndexer() {
  173.         return indexer;
  174.     }

  175.     /** @param indexer to set */
  176.     public void setIndex(Indexer indexer) {
  177.         this.indexer = indexer;
  178.     }

  179. }