SearchSocket.java

  1. package gov.usgs.earthquake.indexer;

  2. import gov.usgs.earthquake.distribution.FileProductStorage;
  3. import gov.usgs.util.StreamUtils;

  4. import java.io.BufferedInputStream;
  5. import java.io.BufferedOutputStream;
  6. import java.io.InputStream;
  7. import java.io.OutputStream;
  8. import java.io.PipedInputStream;
  9. import java.io.PipedOutputStream;
  10. import java.net.InetAddress;
  11. import java.net.Socket;
  12. import java.util.zip.DeflaterOutputStream;
  13. import java.util.zip.InflaterInputStream;

  14. /**
  15.  * Client side of search socket interface.
  16.  */
  17. public class SearchSocket {

  18.     /** The remote host to connect. */
  19.     private final InetAddress host;

  20.     /** The remote port to connect. */
  21.     private final int port;

  22.     /**
  23.      * Construct a new SearchSocket.
  24.      *
  25.      * @param host
  26.      *            the remote host.
  27.      * @param port
  28.      *            the remote port.
  29.      */
  30.     public SearchSocket(final InetAddress host, final int port) {
  31.         this.host = host;
  32.         this.port = port;
  33.     }

  34.     /**
  35.      * Send a search request, converting the response to a java object.
  36.      *
  37.      * @param request
  38.      *            the request to send.
  39.      * @param storage
  40.      *            where received products are stored.
  41.      * @return the response.
  42.      * @throws Exception if error occurs
  43.      */
  44.     public SearchResponse search(final SearchRequest request, final FileProductStorage storage) throws Exception {
  45.         final PipedInputStream pipedIn = new PipedInputStream();
  46.         final PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);

  47.         // parse response in background, while searching
  48.         ResponseParserThread thread = new ResponseParserThread(pipedIn, storage);
  49.         thread.start();

  50.         // start search, sending response xml to response parser thread
  51.         search(request, pipedOut);

  52.         // wait for parsing to complete
  53.         thread.join();

  54.         // either return parsed object, or raise parse exception
  55.         if (thread.getSearchResponse() != null) {
  56.             return thread.getSearchResponse();
  57.         } else {
  58.             throw thread.getParseError();
  59.         }
  60.     }

  61.     /**
  62.      * Send a search request, writing the response to an outputstream.
  63.      *
  64.      * @param request
  65.      *            the request to send.
  66.      * @param responseOut
  67.      *            the outputstream to write.
  68.      * @throws Exception if error occurs
  69.      */
  70.     public void search(final SearchRequest request,
  71.             final OutputStream responseOut) throws Exception {
  72.         Socket socket = null;
  73.         DeflaterOutputStream out = null;
  74.         InputStream in = null;

  75.         try {
  76.             // connect to the configured endpoint
  77.             socket = new Socket(host, port);

  78.             // send the request as compressed xml
  79.             out = new DeflaterOutputStream(new BufferedOutputStream(
  80.                     socket.getOutputStream()));
  81.             SearchXML.toXML(request,
  82.                     new StreamUtils.UnclosableOutputStream(out));

  83.             // must finish and flush to complete Deflater stream
  84.             out.finish();
  85.             out.flush();

  86.             // now read response
  87.             in = new InflaterInputStream(new BufferedInputStream(
  88.                     socket.getInputStream()));
  89.             StreamUtils.transferStream(in, responseOut);
  90.         } finally {
  91.             // make sure socket is closed
  92.             try {
  93.                 socket.close();
  94.             } catch (Exception e) {
  95.                 // ignore
  96.             }
  97.         }
  98.     }

  99.     /**
  100.      * Thread used for parsing search response in background.
  101.      */
  102.     private static class ResponseParserThread extends Thread {

  103.         /** Input stream being parsed. */
  104.         private InputStream in = null;

  105.         /** Storage where received products are stored. */
  106.         private FileProductStorage storage = null;

  107.         /** The parsed search response. */
  108.         private SearchResponse searchResponse = null;

  109.         /** Parse error, if one happened. */
  110.         private Exception parseError = null;

  111.         public ResponseParserThread(final InputStream in, final FileProductStorage storage) {
  112.             this.in = in;
  113.             this.storage = storage;
  114.         }

  115.         public void run() {
  116.             try {
  117.                 searchResponse = SearchXML.parseResponse(in, storage);
  118.             } catch (Exception e) {
  119.                 searchResponse = null;
  120.                 parseError = e;
  121.             }
  122.         }

  123.         public SearchResponse getSearchResponse() {
  124.             return searchResponse;
  125.         }

  126.         public Exception getParseError() {
  127.             return parseError;
  128.         }
  129.     }

  130. }