SearchResponseXmlProductSource.java
- /*
- * SearchResponseXmlProductSource
- */
- package gov.usgs.earthquake.indexer;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import org.xml.sax.Attributes;
- import org.xml.sax.SAXException;
- import gov.usgs.earthquake.distribution.FileProductStorage;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.earthquake.product.io.ProductHandler;
- import gov.usgs.earthquake.product.io.XmlProductHandler;
- import gov.usgs.earthquake.product.io.XmlProductSource;
- /**
- * Used by SearchResponseParser to store products during parsing.
- *
- * Creates a "background" storage thread for storing, while this classes
- * startElement, characters, and endElement methods are called by the
- * "foreground" xml parsing thread.
- */
- public class SearchResponseXmlProductSource extends XmlProductSource {
- /** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(SearchResponseXmlProductSource.class.getName());
- /** The storage where the product is streamed. */
- private FileProductStorage storage;
- /** The stored id. */
- private Product storedProduct = null;
- /** The thread where storage does its thing (current thread is xml parsing). */
- private Thread storageThread;
- /** */
- private final Object waitForSetHandlerSync = new Object();
- /** */
- private final Object waitForStreamToSync = new Object();
- /** start product attributes, for acquiring writelock in background thread */
- private String uri;
- private String localName;
- private String qName;
- private Attributes attributes;
- private SAXException exception;
- /**
- * Construct a SearchResponseXmlProductSource.
- *
- * @param storage
- * the storage where the parsed product is stored.
- */
- public SearchResponseXmlProductSource(final FileProductStorage storage) {
- super((ProductHandler) null);
- this.setStorage(storage);
- }
- /**
- * Called by the underlying product storage as part os storeProductSource.
- *
- * This method notifies the XML parsing thread that parsing may continue,
- * since the handler is now setup.
- */
- @Override
- public void streamTo(final ProductHandler handler) {
- this.setHandler(handler);
- try {
- // start the product in this thread, so the write lock is acquired
- // and released in the same thread
- super.startElement(uri, localName, qName, attributes);
- } catch (SAXException e) {
- exception = e;
- }
- // clear references that are no longer needed
- this.uri = null;
- this.localName = null;
- this.qName = null;
- this.attributes = null;
- synchronized (waitForSetHandlerSync) {
- // notify xml parsing thread that handler is all set
- waitForSetHandlerSync.notify();
- }
- synchronized (waitForStreamToSync) {
- try {
- // wait for xml parsing thread to notify streamTo is complete
- waitForStreamToSync.wait();
- } catch (Exception e) {
- // ignore
- }
- }
- }
- @Override
- public void startElement(final String uri, final String localName,
- final String qName, final Attributes attributes)
- throws SAXException {
- boolean startElementAlreadySent = false;
- if (uri.equals(XmlProductHandler.PRODUCT_XML_NAMESPACE)
- && localName.equals(XmlProductHandler.PRODUCT_ELEMENT)) {
- // save these to write lock can be acquired by correct thread.
- this.uri = uri;
- this.localName = localName;
- this.qName = qName;
- this.attributes = attributes;
- // starting a product, set up the storage handler/thread
- // reference used by storage thread to set product
- final SearchResponseXmlProductSource thisSource = this;
- storageThread = new Thread() {
- public void run() {
- try {
- ProductId id = storage
- .storeProductSource(thisSource);
- thisSource.setProduct(storage.getProduct(id));
- } catch (Exception e) {
- LOGGER.log(Level.WARNING,
- "Exception while storing product", e);
- thisSource.setProduct(null);
- }
- }
- };
- synchronized (waitForSetHandlerSync) {
- storageThread.start();
- try {
- // wait for storage thread to call streamTo with
- // handler
- waitForSetHandlerSync.wait();
- } catch (InterruptedException e) {
- // ignore
- }
- // handler set, ready to continue parsing
- // signal that we've already sent the startElement
- startElementAlreadySent = true;
- // if an exception was generated in background thread, throw
- // it here
- if (exception != null) {
- throw exception;
- }
- }
- }
- if (!startElementAlreadySent) {
- // forward call to parser
- super.startElement(uri, localName, qName, attributes);
- }
- }
- public void endElement(final String uri, final String localName,
- final String qName) throws SAXException {
- // forward call to parser
- super.endElement(uri, localName, qName);
- if (!uri.equals(XmlProductHandler.PRODUCT_XML_NAMESPACE)) {
- return;
- }
- if (localName.equals(XmlProductHandler.PRODUCT_ELEMENT)) {
- // done parsing the product
- synchronized (waitForStreamToSync) {
- // notify storageThread streamTo is complete
- waitForStreamToSync.notify();
- }
- try {
- // wait for storageThread to complete so storage will have
- // called setProduct before returning
- storageThread.join();
- } catch (InterruptedException e) {
- // ignore
- } finally {
- storageThread = null;
- }
- }
- }
- /** @param storage FileProductStorage to set */
- public void setStorage(FileProductStorage storage) {
- this.storage = storage;
- }
- /** @return FileProductStorage */
- public FileProductStorage getStorage() {
- return storage;
- }
- /**
- * @return the parsed, stored product.
- */
- public Product getProduct() {
- return this.storedProduct;
- }
- /**
- * Method used by storage to provide the parsed product.
- *
- * @param product to set
- */
- protected void setProduct(final Product product) {
- this.storedProduct = product;
- }
- }