BinaryProductSource.java

package gov.usgs.earthquake.product.io;

import gov.usgs.earthquake.product.InputStreamContent;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.CryptoUtils.Version;

import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URI;
import java.net.URL;
import java.util.Date;

/**
 * Parser for binary format for product data.
 */
public class BinaryProductSource implements ProductSource {

	/** product being parsed. */
	private ProductId id;
	/** stream being parsed. */
	private InputStream in;
	/** binary io utility. */
	private BinaryIO io;

	/**
	 * Constructor. Sets up a new BinaryIO
	 * @param in an InputStream
	 */
	public BinaryProductSource(final InputStream in) {
		this.in = in;
		this.io = new BinaryIO();
	}

	@Override
	public void streamTo(ProductHandler out) throws Exception {
		try {
			while (true) {
				String next = io.readString(in);

				if (next.equals(BinaryProductHandler.HEADER)) {
					// begin product
					id = ProductId.parse(io.readString(in));
					String status = io.readString(in);
					// allow tracker url to be null
					URL trackerURL = null;
					String trackerURLString = io.readString(in);
					if (!trackerURLString.equalsIgnoreCase("null")) {
						trackerURL = new URL(trackerURLString);
					}
					out.onBeginProduct(id, status, trackerURL);
				} else if (next.equals(BinaryProductHandler.PROPERTY)) {
					String name = io.readString(in);
					String value = io.readString(in);
					out.onProperty(id, name, value);
				} else if (next.equals(BinaryProductHandler.LINK)) {
					String relation = io.readString(in);
					URI href = new URI(io.readString(in));
					out.onLink(id, relation, href);
				} else if (next.equals(BinaryProductHandler.CONTENT)) {
					String path = io.readString(in);
					String contentType = io.readString(in);
					Date lastModified = io.readDate(in);
					Long length = io.readLong(in);

					// use a piped output stream to deliver content to separate
					// processing thread. this thread will continue to read
					// InputStream, transfer content to pipedOutputStream.
					// Background thread calls onContent, and reads from
					// pipedInputStream.
					PipedOutputStream pipedOut = new PipedOutputStream();
					PipedInputStream pipedIn = new PipedInputStream(pipedOut);

					final InputStreamContent content = new InputStreamContent(
							pipedIn);
					content.setContentType(contentType);
					content.setLastModified(lastModified);
					content.setLength(length);

					// background thread delivers content object to product handler
					ContentOutputThread outputThread = new ContentOutputThread(out, id, path, content);

					try {
						outputThread.start();

						// read stream content
						io.readStream(length, in, pipedOut);
					} finally {
						// done reading content, close piped stream to signal EOF.
						StreamUtils.closeStream(pipedOut);
						pipedOut = null;
						try {
							// wait for background thread to complete
							outputThread.join();
						} catch (Exception e) {
							// ignore
						}
						outputThread = null;
						content.close();
					}

				} else if (next.equals(BinaryProductHandler.SIGNATUREVERSION)) {
					Version version = Version.fromString(io.readString(in));
					out.onSignatureVersion(id, version);
				} else if (next.equals(BinaryProductHandler.SIGNATURE)) {
					String signature = io.readString(in);
					out.onSignature(id, signature);
				} else if (next.equals(BinaryProductHandler.FOOTER)) {
					out.onEndProduct(id);
					id = null;

					// end of product stream
					break;
				}
			}
		} finally {
			StreamUtils.closeStream(in);
		}
	}


	/**
	 * Free any resources associated with this source.
	 */
	@Override
	public void close() {
		StreamUtils.closeStream(in);
		in = null;
	}

}