BinaryProductSource.java

  1. package gov.usgs.earthquake.product.io;

  2. import gov.usgs.earthquake.product.InputStreamContent;
  3. import gov.usgs.earthquake.product.ProductId;
  4. import gov.usgs.util.StreamUtils;
  5. import gov.usgs.util.CryptoUtils.Version;

  6. import java.io.InputStream;
  7. import java.io.PipedInputStream;
  8. import java.io.PipedOutputStream;
  9. import java.net.URI;
  10. import java.net.URL;
  11. import java.util.Date;

  12. /**
  13.  * Parser for binary format for product data.
  14.  */
  15. public class BinaryProductSource implements ProductSource {

  16.     /** product being parsed. */
  17.     private ProductId id;
  18.     /** stream being parsed. */
  19.     private InputStream in;
  20.     /** binary io utility. */
  21.     private BinaryIO io;

  22.     /**
  23.      * Constructor. Sets up a new BinaryIO
  24.      * @param in an InputStream
  25.      */
  26.     public BinaryProductSource(final InputStream in) {
  27.         this.in = in;
  28.         this.io = new BinaryIO();
  29.     }

  30.     @Override
  31.     public void streamTo(ProductHandler out) throws Exception {
  32.         try {
  33.             while (true) {
  34.                 String next = io.readString(in);

  35.                 if (next.equals(BinaryProductHandler.HEADER)) {
  36.                     // begin product
  37.                     id = ProductId.parse(io.readString(in));
  38.                     String status = io.readString(in);
  39.                     // allow tracker url to be null
  40.                     URL trackerURL = null;
  41.                     String trackerURLString = io.readString(in);
  42.                     if (!trackerURLString.equalsIgnoreCase("null")) {
  43.                         trackerURL = new URL(trackerURLString);
  44.                     }
  45.                     out.onBeginProduct(id, status, trackerURL);
  46.                 } else if (next.equals(BinaryProductHandler.PROPERTY)) {
  47.                     String name = io.readString(in);
  48.                     String value = io.readString(in);
  49.                     out.onProperty(id, name, value);
  50.                 } else if (next.equals(BinaryProductHandler.LINK)) {
  51.                     String relation = io.readString(in);
  52.                     URI href = new URI(io.readString(in));
  53.                     out.onLink(id, relation, href);
  54.                 } else if (next.equals(BinaryProductHandler.CONTENT)) {
  55.                     String path = io.readString(in);
  56.                     String contentType = io.readString(in);
  57.                     Date lastModified = io.readDate(in);
  58.                     Long length = io.readLong(in);

  59.                     // use a piped output stream to deliver content to separate
  60.                     // processing thread. this thread will continue to read
  61.                     // InputStream, transfer content to pipedOutputStream.
  62.                     // Background thread calls onContent, and reads from
  63.                     // pipedInputStream.
  64.                     PipedOutputStream pipedOut = new PipedOutputStream();
  65.                     PipedInputStream pipedIn = new PipedInputStream(pipedOut);

  66.                     final InputStreamContent content = new InputStreamContent(
  67.                             pipedIn);
  68.                     content.setContentType(contentType);
  69.                     content.setLastModified(lastModified);
  70.                     content.setLength(length);

  71.                     // background thread delivers content object to product handler
  72.                     ContentOutputThread outputThread = new ContentOutputThread(out, id, path, content);

  73.                     try {
  74.                         outputThread.start();

  75.                         // read stream content
  76.                         io.readStream(length, in, pipedOut);
  77.                     } finally {
  78.                         // done reading content, close piped stream to signal EOF.
  79.                         StreamUtils.closeStream(pipedOut);
  80.                         pipedOut = null;
  81.                         try {
  82.                             // wait for background thread to complete
  83.                             outputThread.join();
  84.                         } catch (Exception e) {
  85.                             // ignore
  86.                         }
  87.                         outputThread = null;
  88.                         content.close();
  89.                     }

  90.                 } else if (next.equals(BinaryProductHandler.SIGNATUREVERSION)) {
  91.                     Version version = Version.fromString(io.readString(in));
  92.                     out.onSignatureVersion(id, version);
  93.                 } else if (next.equals(BinaryProductHandler.SIGNATURE)) {
  94.                     String signature = io.readString(in);
  95.                     out.onSignature(id, signature);
  96.                 } else if (next.equals(BinaryProductHandler.FOOTER)) {
  97.                     out.onEndProduct(id);
  98.                     id = null;

  99.                     // end of product stream
  100.                     break;
  101.                 }
  102.             }
  103.         } finally {
  104.             StreamUtils.closeStream(in);
  105.         }
  106.     }


  107.     /**
  108.      * Free any resources associated with this source.
  109.      */
  110.     @Override
  111.     public void close() {
  112.         StreamUtils.closeStream(in);
  113.         in = null;
  114.     }

  115. }