XmlProductSource.java

/*
 * XmlProductSource
 */
package gov.usgs.earthquake.product.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URI;
import java.net.URL;
import java.util.Base64;
import java.util.Date;

import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;
import gov.usgs.util.CryptoUtils.Version;
import gov.usgs.earthquake.product.InputStreamContent;
import gov.usgs.earthquake.product.URLContent;
import gov.usgs.earthquake.product.ProductId;


/**
 * Load a product from an InputStream containing XML.
 */
public class XmlProductSource extends DefaultHandler implements ProductSource {

	/** The input stream where xml is read. */
	private InputStream in;

	/** The ProductOutput where events are sent. */
	private ProductHandler out;

	/** The Product being parsed. */
	private ProductId id;

	/** Used to read content in ProductOutput while parsing continues. */
	private ContentOutputThread contentOutputThread;

	/** Used to send content to ProductOutput as it is read. */
	private PipedOutputStream contentOutputStream;

	/** Content being read. */
	private InputStreamContent content;

	/** Used for signature ProductOutput. */
	private StringBuffer signatureBuffer;

	/**
	 * Create a new XmlProductSource.
	 *
	 * @param in
	 *            the input stream where xml is read.
	 */
	public XmlProductSource(final InputStream in) {
		this.in = in;
	}

	/**
	 * Create a new XmlProductSource for embedding in another default handler.
	 *
	 * @param out
	 *            the ProductHandler to receive product events.
	 */
	public XmlProductSource(final ProductHandler out) {
		this.out = out;
	}

	/**
	 * Begin reading the input stream, sending events to out.
	 *
	 * @param out
	 *            the receiving ProductOutput.
	 */
	public synchronized void streamTo(ProductHandler out) throws Exception {
		try {
			this.out = out;
			XmlUtils.parse(in, this);
		} finally {
			StreamUtils.closeStream(in);
		}
	}

	/**
	 * Override DefaultHandler startElement. Adds a new element content buffer
	 * and calls onStartElement.
	 *
	 * @param uri
	 *            element uri.
	 * @param localName
	 *            element localName.
	 * @param qName
	 *            element qName.
	 * @param attributes
	 *            element attributes.
	 * @throws SAXException
	 *             if onStartElement throws a SAXException.
	 */
	public synchronized void startElement(final String uri,
			final String localName, final String qName,
			final Attributes attributes) throws SAXException {

		if (XmlProductHandler.PRODUCT_XML_NAMESPACE.equals(uri)) {
			// PRODUCT
			if (XmlProductHandler.PRODUCT_ELEMENT.equals(localName)) {
				id = ProductId.parse(XmlUtils.getAttribute(attributes, uri,
						XmlProductHandler.PRODUCT_ATTRIBUTE_ID));
				id.setUpdateTime(XmlUtils.getDate(XmlUtils.getAttribute(
						attributes, uri,
						XmlProductHandler.PRODUCT_ATTRIBUTE_UPDATED)));

				String status = XmlUtils.getAttribute(attributes, uri,
						XmlProductHandler.PRODUCT_ATTRIBUTE_STATUS);

				URL trackerURL = null;
				try {
					trackerURL = new URL(XmlUtils.getAttribute(attributes, uri,
							XmlProductHandler.PRODUCT_ATTRIBUTE_TRACKER_URL));
				} catch (Exception e) {
					// ignore
				}

				try {
					out.onBeginProduct(id, status, trackerURL);
				} catch (Exception e) {
					throw new SAXException(e);
				}
			}
			// PROPERTY
			else if (XmlProductHandler.PROPERTY_ELEMENT.equals(localName)) {
				String name = XmlUtils.getAttribute(attributes, uri,
						XmlProductHandler.PROPERTY_ATTRIBUTE_NAME);
				String value = XmlUtils.getAttribute(attributes, uri,
						XmlProductHandler.PROPERTY_ATTRIBUTE_VALUE);

				try {
					out.onProperty(id, name, value);
				} catch (Exception e) {
					throw new SAXException(e);
				}
			}
			// LINK
			else if (XmlProductHandler.LINK_ELEMENT.equals(localName)) {
				String relation = XmlUtils.getAttribute(attributes, uri,
						XmlProductHandler.LINK_ATTRIBUTE_RELATION);
				URI href = null;
				try {
					href = new URI(XmlUtils.getAttribute(attributes, uri,
							XmlProductHandler.LINK_ATTRIBUTE_HREF));
				} catch (Exception e) {
					return;
				}

				try {
					out.onLink(id, relation, href);
				} catch (Exception e) {
					throw new SAXException(e);
				}
			}
			// CONTENT
			else if (XmlProductHandler.CONTENT_ELEMENT.equals(localName)) {
				try {
					String type = XmlUtils.getAttribute(attributes, uri,
							XmlProductHandler.CONTENT_ATTRIBUTE_TYPE);
					Long length = Long.valueOf(XmlUtils.getAttribute(
							attributes, uri,
							XmlProductHandler.CONTENT_ATTRIBUTE_LENGTH));
					Date modified = XmlUtils.getDate(XmlUtils.getAttribute(
							attributes, uri,
							XmlProductHandler.CONTENT_ATTRIBUTE_MODIFIED));
					String path = XmlUtils.getAttribute(attributes, uri,
							XmlProductHandler.CONTENT_ATTRIBUTE_PATH);
					String encoded = XmlUtils.getAttribute(attributes, uri,
							XmlProductHandler.CONTENT_ATTRIBUTE_ENCODED);
					String href = XmlUtils.getAttribute(attributes, uri,
							XmlProductHandler.CONTENT_ATTRIBUTE_HREF);

					if (href != null) {
						// URL CONTENT
						URL url = null;
						try {
							url = new URL(href);
						} catch (Exception e) {
							throw new SAXException(e);
						}
						URLContent content = new URLContent(url);
						content.setContentType(type);
						content.setLength(length);
						content.setLastModified(modified);

						out.onContent(id, path, content);
						return;
					}

					else {
						// EMBEDDED CONTENT
						// set up a piped stream
						InputStream contentInputStream = openContentStream(
								encoded != null && "true".equals(encoded));

						content = new InputStreamContent(
								contentInputStream);
						content.setContentType(type);
						content.setLength(length);
						content.setLastModified(modified);

						// call onContent in separate thread so parsing thread
						// can continue. Element content is fed during the
						// characters method.
						contentOutputThread = new ContentOutputThread(out, id, path, content);
						contentOutputThread.start();
					}

				} catch (Exception e) {
					closeContent();
					throw new SAXException(e);
				}
			}
			// SIGNATURE
			else if (XmlProductHandler.SIGNATURE_ELEMENT.equals(localName)) {
				String version = XmlUtils.getAttribute(attributes, uri,
						XmlProductHandler.SIGNATURE_ATTRIBUTE_VERSION);
				try {
					out.onSignatureVersion(id,
							version == null
							? Version.SIGNATURE_V1
							: Version.fromString(version));
				} catch (Exception e) {
					throw new SAXException(e);
				}
				signatureBuffer = new StringBuffer();
			}
		}
	}

	/**
	 * Override DefaultHandler endElement. Retrieves element content buffer and
	 * passes it to onEndElement.
	 *
	 * @param uri
	 *            element uri.
	 * @param localName
	 *            element localName.
	 * @param qName
	 *            element qName.
	 * @throws SAXException
	 *             if onEndElement throws a SAXException.
	 */
	public synchronized void endElement(final String uri,
			final String localName, final String qName) throws SAXException {
		if (XmlProductHandler.PRODUCT_XML_NAMESPACE.equals(uri)) {
			try {
				if (XmlProductHandler.CONTENT_ELEMENT.equals(localName)) {
					// done reading content content, close piped stream to
					// signal EOF.
					closeContent();
				} else if (XmlProductHandler.SIGNATURE_ELEMENT
						.equals(localName)) {
					String signature = signatureBuffer.toString();
					signatureBuffer = null;
					out.onSignature(id, signature);
				} else if (XmlProductHandler.PRODUCT_ELEMENT.equals(localName)) {
					out.onEndProduct(id);
				}
			} catch (Exception e) {
				closeContent();
				throw new SAXException(e);
			}
		}
	}

	/**
	 * Override DefaultHandler characters. Appends content to current element
	 * buffer, or skips if before first element.
	 *
	 * @param ch
	 *            content.
	 * @param start
	 *            position in content to read.
	 * @param length
	 *            lenth of content to read.
	 * @throws SAXException
	 *             never.
	 */
	public synchronized void characters(final char[] ch, final int start,
			final int length) throws SAXException {
		String chars = new String(ch, start, length);
		if (contentOutputStream != null) {
			try {
				contentOutputStream.write(chars.getBytes());
			} catch (Exception e) {
				// close the piped stream if there was an exception
				closeContent();
				throw new SAXException(e);
			}
		} else if (signatureBuffer != null) {
			signatureBuffer.append(chars);
		} else {
			// ignore, only interested in content or signature
		}
	}

	/** @return ProductHandler */
	protected synchronized ProductHandler getHandler() {
		return out;
	}

	/** @param out ProductHandler to set */
	protected synchronized void setHandler(ProductHandler out) {
		this.out = out;
	}


	/**
	 * Free any resources associated with this handler.
	 */
	@Override
	public void close() {
		closeContent();

		StreamUtils.closeStream(in);
		if (out != null) {
			out.close();
		}
	}

	/**
	 * Set up a piped output stream used during parsing.
	 *
	 * The XmlProductSource parsing thread starts a background thread
	 * to deliver content to the handler, then continues parsing XML
	 * and delivers parsed content via the piped streams.
	 *
	 * If xml parsing completes as expected, the parsing thread
	 * will close the connection in {@link #closeContent()}.  If
	 * errors occur, the objects handling the product source object
	 * call closeContent to ensure the resource is closed.
	 *
	 * @param encoded if it needs to decode base 64 content
	 * @return a input stream of the Piped output stream
	 * @throws IOException if io error occurs
	 */
	@SuppressWarnings("resource")
	public InputStream openContentStream(boolean encoded) throws IOException {
		// EMBEDDED CONTENT
		contentOutputStream = new PipedOutputStream();
		InputStream contentInputStream = new PipedInputStream(
				contentOutputStream);

		// decode base 64 encoded content
		if (encoded) {
			// this stream is closed by closeContent()
			// either in this thread if everything succeeds,
			// or by the objects using the ProductSource.
			contentInputStream = Base64.getDecoder()
					.wrap(contentInputStream);
		}

		return contentInputStream;
	}

	/**
	 * Closes an open output stream
	 */
	public void closeContent() {
		StreamUtils.closeStream(contentOutputStream);
		contentOutputStream = null;
		if (contentOutputThread != null) {
			try {
				contentOutputThread.join();
			} catch (Exception e) {
				// ignore
			}
		}
		contentOutputThread = null;
		if (content != null) {
			content.close();
		}
	}

}