SearchResponseXmlProductSource.java
/*
* SearchResponseXmlProductSource
*/
package gov.usgs.earthquake.indexer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import gov.usgs.earthquake.distribution.FileProductStorage;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.ProductHandler;
import gov.usgs.earthquake.product.io.XmlProductHandler;
import gov.usgs.earthquake.product.io.XmlProductSource;
/**
* Used by SearchResponseParser to store products during parsing.
*
* Creates a "background" storage thread for storing, while this classes
* startElement, characters, and endElement methods are called by the
* "foreground" xml parsing thread.
*/
public class SearchResponseXmlProductSource extends XmlProductSource {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(SearchResponseXmlProductSource.class.getName());
/** The storage where the product is streamed. */
private FileProductStorage storage;
/** The stored id. */
private Product storedProduct = null;
/** The thread where storage does its thing (current thread is xml parsing). */
private Thread storageThread;
/** */
private final Object waitForSetHandlerSync = new Object();
/** */
private final Object waitForStreamToSync = new Object();
/** start product attributes, for acquiring writelock in background thread */
private String uri;
private String localName;
private String qName;
private Attributes attributes;
private SAXException exception;
/**
* Construct a SearchResponseXmlProductSource.
*
* @param storage
* the storage where the parsed product is stored.
*/
public SearchResponseXmlProductSource(final FileProductStorage storage) {
super((ProductHandler) null);
this.setStorage(storage);
}
/**
* Called by the underlying product storage as part os storeProductSource.
*
* This method notifies the XML parsing thread that parsing may continue,
* since the handler is now setup.
*/
@Override
public void streamTo(final ProductHandler handler) {
this.setHandler(handler);
try {
// start the product in this thread, so the write lock is acquired
// and released in the same thread
super.startElement(uri, localName, qName, attributes);
} catch (SAXException e) {
exception = e;
}
// clear references that are no longer needed
this.uri = null;
this.localName = null;
this.qName = null;
this.attributes = null;
synchronized (waitForSetHandlerSync) {
// notify xml parsing thread that handler is all set
waitForSetHandlerSync.notify();
}
synchronized (waitForStreamToSync) {
try {
// wait for xml parsing thread to notify streamTo is complete
waitForStreamToSync.wait();
} catch (Exception e) {
// ignore
}
}
}
@Override
public void startElement(final String uri, final String localName,
final String qName, final Attributes attributes)
throws SAXException {
boolean startElementAlreadySent = false;
if (uri.equals(XmlProductHandler.PRODUCT_XML_NAMESPACE)
&& localName.equals(XmlProductHandler.PRODUCT_ELEMENT)) {
// save these to write lock can be acquired by correct thread.
this.uri = uri;
this.localName = localName;
this.qName = qName;
this.attributes = attributes;
// starting a product, set up the storage handler/thread
// reference used by storage thread to set product
final SearchResponseXmlProductSource thisSource = this;
storageThread = new Thread() {
public void run() {
try {
ProductId id = storage
.storeProductSource(thisSource);
thisSource.setProduct(storage.getProduct(id));
} catch (Exception e) {
LOGGER.log(Level.WARNING,
"Exception while storing product", e);
thisSource.setProduct(null);
}
}
};
synchronized (waitForSetHandlerSync) {
storageThread.start();
try {
// wait for storage thread to call streamTo with
// handler
waitForSetHandlerSync.wait();
} catch (InterruptedException e) {
// ignore
}
// handler set, ready to continue parsing
// signal that we've already sent the startElement
startElementAlreadySent = true;
// if an exception was generated in background thread, throw
// it here
if (exception != null) {
throw exception;
}
}
}
if (!startElementAlreadySent) {
// forward call to parser
super.startElement(uri, localName, qName, attributes);
}
}
public void endElement(final String uri, final String localName,
final String qName) throws SAXException {
// forward call to parser
super.endElement(uri, localName, qName);
if (!uri.equals(XmlProductHandler.PRODUCT_XML_NAMESPACE)) {
return;
}
if (localName.equals(XmlProductHandler.PRODUCT_ELEMENT)) {
// done parsing the product
synchronized (waitForStreamToSync) {
// notify storageThread streamTo is complete
waitForStreamToSync.notify();
}
try {
// wait for storageThread to complete so storage will have
// called setProduct before returning
storageThread.join();
} catch (InterruptedException e) {
// ignore
} finally {
storageThread = null;
}
}
}
/** @param storage FileProductStorage to set */
public void setStorage(FileProductStorage storage) {
this.storage = storage;
}
/** @return FileProductStorage */
public FileProductStorage getStorage() {
return storage;
}
/**
* @return the parsed, stored product.
*/
public Product getProduct() {
return this.storedProduct;
}
/**
* Method used by storage to provide the parsed product.
*
* @param product to set
*/
protected void setProduct(final Product product) {
this.storedProduct = product;
}
}