BinaryProductSource.java
package gov.usgs.earthquake.product.io;
import gov.usgs.earthquake.product.InputStreamContent;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.CryptoUtils.Version;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URI;
import java.net.URL;
import java.util.Date;
/**
* Parser for binary format for product data.
*/
public class BinaryProductSource implements ProductSource {
/** product being parsed. */
private ProductId id;
/** stream being parsed. */
private InputStream in;
/** binary io utility. */
private BinaryIO io;
/**
* Constructor. Sets up a new BinaryIO
* @param in an InputStream
*/
public BinaryProductSource(final InputStream in) {
this.in = in;
this.io = new BinaryIO();
}
@Override
public void streamTo(ProductHandler out) throws Exception {
try {
while (true) {
String next = io.readString(in);
if (next.equals(BinaryProductHandler.HEADER)) {
// begin product
id = ProductId.parse(io.readString(in));
String status = io.readString(in);
// allow tracker url to be null
URL trackerURL = null;
String trackerURLString = io.readString(in);
if (!trackerURLString.equalsIgnoreCase("null")) {
trackerURL = new URL(trackerURLString);
}
out.onBeginProduct(id, status, trackerURL);
} else if (next.equals(BinaryProductHandler.PROPERTY)) {
String name = io.readString(in);
String value = io.readString(in);
out.onProperty(id, name, value);
} else if (next.equals(BinaryProductHandler.LINK)) {
String relation = io.readString(in);
URI href = new URI(io.readString(in));
out.onLink(id, relation, href);
} else if (next.equals(BinaryProductHandler.CONTENT)) {
String path = io.readString(in);
String contentType = io.readString(in);
Date lastModified = io.readDate(in);
Long length = io.readLong(in);
// use a piped output stream to deliver content to separate
// processing thread. this thread will continue to read
// InputStream, transfer content to pipedOutputStream.
// Background thread calls onContent, and reads from
// pipedInputStream.
PipedOutputStream pipedOut = new PipedOutputStream();
PipedInputStream pipedIn = new PipedInputStream(pipedOut);
final InputStreamContent content = new InputStreamContent(
pipedIn);
content.setContentType(contentType);
content.setLastModified(lastModified);
content.setLength(length);
// background thread delivers content object to product handler
ContentOutputThread outputThread = new ContentOutputThread(out, id, path, content);
try {
outputThread.start();
// read stream content
io.readStream(length, in, pipedOut);
} finally {
// done reading content, close piped stream to signal EOF.
StreamUtils.closeStream(pipedOut);
pipedOut = null;
try {
// wait for background thread to complete
outputThread.join();
} catch (Exception e) {
// ignore
}
outputThread = null;
content.close();
}
} else if (next.equals(BinaryProductHandler.SIGNATUREVERSION)) {
Version version = Version.fromString(io.readString(in));
out.onSignatureVersion(id, version);
} else if (next.equals(BinaryProductHandler.SIGNATURE)) {
String signature = io.readString(in);
out.onSignature(id, signature);
} else if (next.equals(BinaryProductHandler.FOOTER)) {
out.onEndProduct(id);
id = null;
// end of product stream
break;
}
}
} finally {
StreamUtils.closeStream(in);
}
}
/**
* Free any resources associated with this source.
*/
@Override
public void close() {
StreamUtils.closeStream(in);
in = null;
}
}