ProductBuilder.java
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.Product;
import gov.usgs.util.Config;
import gov.usgs.util.CryptoUtils;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.StringUtils;
import gov.usgs.util.CryptoUtils.Version;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.PrivateKey;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Essentials for building/sending products.
*
* This is the base class for other builders.
*
* Supported configurable properties:
* <dl>
* <dt>senders</dt>
* <dd>A comma delimited list of product senders to use when sending
* products.</dd>
* <dt>trackerURL</dt>
* <dd>Default tracker URL to assign to products that don't already have
* one.</dd>
* <dt>privateKeyFile</dt>
* <dd>Path to a private key that can be used to sign products.</dd>
* </dl>
*/
public class ProductBuilder extends DefaultConfigurable {
private static final Logger LOGGER = Logger.getLogger(ProductBuilder.class.getSimpleName());
/** Configurable property for senders. */
public static final String SENDERS_PROPERTY = "senders";
/** Property name used for configuring a tracker url. */
public static final String TRACKER_URL_PROPERTY = "trackerURL";
/** Private key filename configuration property. */
public static final String PRIVATE_KEY_PROPERTY = "privateKeyFile";
/** Signature version property. */
public static final String SIGNATURE_VERSION_PROPERTY = "signatureVersion";
/** Send in parallel. */
public static final String PARALLEL_SEND_PROPERTY = "parallelSend";
/** Bool for parallel send */
public static final String DEFAULT_PARALLEL_SEND = "true";
/** Timeout in seconds for parallel send. */
public static final String PARALLEL_SEND_TIMEOUT_PROPERTY = "parallelSendTimeout";
/** time in ms for parallel send timemout */
public static final String DEFAULT_PARALLEL_SEND_TIMEOUT = "300";
/** Default tracker url. */
public static final URL DEFAULT_TRACKER_URL;
static {
URL url = null;
try {
url = new URL("http://ehppdl1.cr.usgs.gov/tracker/");
} catch (MalformedURLException mue) {
LOGGER.severe("Failed to parse default tracker url.");
System.exit(1);
}
DEFAULT_TRACKER_URL = url;
}
/** List of senders where built products are sent. */
private List<ProductSender> senders = new LinkedList<ProductSender>();
/** Default trackerURL to set on sent products. */
private URL trackerURL;
/** Key used to sign sent products. */
private PrivateKey privateKey;
/** Signature version. */
private Version signatureVersion = Version.SIGNATURE_V1;
/** Whether to send in parallel. */
protected boolean parallelSend = true;
/** How long to wait before parallel send timeout. */
protected long parallelSendTimeout = 300L;
/** Default product builder constructor */
public ProductBuilder() {
trackerURL = DEFAULT_TRACKER_URL;
}
/**
* Send a product.
*
* If the product doesn't yet have a tracker URL, assigns current tracker URL to
* product. If the product has not yet been signed, and a privateKey is
* configured, signs the product before sending.
*
* @param product the product to send.
* @return map of all exceptions thrown, from Sender to corresponding Exception.
* @throws Exception if an error occurs while signing product.
*/
public Map<ProductSender, Exception> sendProduct(final Product product) throws Exception {
// doesn't already have a tracker url
if (product.getTrackerURL() == null) {
product.setTrackerURL(trackerURL);
}
// mark which version of client was used to create product
product.getProperties().put(ProductClient.PDL_CLIENT_VERSION_PROPERTY, ProductClient.RELEASE_VERSION);
// doesn't already have a signature.
if (privateKey != null && product.getSignature() == null) {
product.sign(privateKey, signatureVersion);
}
// send tracker update
new ProductTracker(product.getTrackerURL()).productCreated(this.getName(), product.getId());
// send product using all product senders.
if (parallelSend) {
return parallelSendProduct(senders, product, parallelSendTimeout);
}
// send sequentially if not parallel
Map<ProductSender, Exception> errors = new HashMap<ProductSender, Exception>();
Iterator<ProductSender> iter = new LinkedList<ProductSender>(senders).iterator();
while (iter.hasNext()) {
ProductSender sender = iter.next();
try {
sender.sendProduct(product);
} catch (Exception e) {
if (e instanceof ProductAlreadyInStorageException) {
// condense this message...
LOGGER.info("Product already in storage, id=" + product.getId().toString());
} else {
LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
errors.put(sender, e);
}
}
}
return errors;
}
/**
* @return list of product senders
*/
public List<ProductSender> getProductSenders() {
return senders;
}
/**
* Add a ProductSender.
*
* @param sender to add
*/
public void addProductSender(final ProductSender sender) {
senders.add(sender);
}
/**
* Remove a previously added ProductSender.
*
* @param sender to remove
*/
public void removeProductSender(final ProductSender sender) {
senders.remove(sender);
}
/** @return trackerURL */
public URL getTrackerURL() {
return trackerURL;
}
/** @param trackerURL to set */
public void setTrackerURL(URL trackerURL) {
this.trackerURL = trackerURL;
}
/** @return privateKey */
public PrivateKey getPrivateKey() {
return privateKey;
}
/** @param privateKey to set */
public void setPrivateKey(PrivateKey privateKey) {
this.privateKey = privateKey;
}
/** @return signatureVersion */
public Version getSignatureVersion() {
return signatureVersion;
}
/** @param signatureVersion to set */
public void setSignatureVersion(Version signatureVersion) {
this.signatureVersion = signatureVersion;
}
@Override
public void configure(final Config config) throws Exception {
Iterator<String> senderNames = StringUtils.split(config.getProperty(SENDERS_PROPERTY), ",").iterator();
while (senderNames.hasNext()) {
String name = senderNames.next();
LOGGER.config("Loading sender " + name);
ProductSender sender = (ProductSender) Config.getConfig().getObject(name);
if (sender == null) {
throw new ConfigurationException("Unable to load sender '" + name + "', make sure it is properly configured.");
}
addProductSender(sender);
}
String url = config.getProperty(TRACKER_URL_PROPERTY);
if (url != null) {
trackerURL = new URL(url);
}
LOGGER.config("[" + getName() + "] Using tracker URL '" + trackerURL.toString() + "'");
String keyFilename = config.getProperty(PRIVATE_KEY_PROPERTY);
if (keyFilename != null) {
LOGGER.config("[" + getName() + "] Loading private key file '" + keyFilename + "'");
privateKey = CryptoUtils.readOpenSSHPrivateKey(StreamUtils.readStream(new File(keyFilename)), null);
}
String version = config.getProperty(SIGNATURE_VERSION_PROPERTY);
if (version != null) {
signatureVersion = Version.fromString(version);
}
LOGGER.config("[" + getName() + "] signature version = " + signatureVersion);
parallelSend = Boolean.valueOf(config.getProperty(PARALLEL_SEND_PROPERTY, DEFAULT_PARALLEL_SEND));
parallelSendTimeout = Long
.valueOf(config.getProperty(PARALLEL_SEND_TIMEOUT_PROPERTY, DEFAULT_PARALLEL_SEND_TIMEOUT));
LOGGER.config("[" + getName() + "] parallel send enabled=" + parallelSend + ", timeout=" + parallelSendTimeout);
}
@Override
public void shutdown() throws Exception {
Iterator<ProductSender> iter = senders.iterator();
while (iter.hasNext()) {
iter.next().shutdown();
}
}
@Override
public void startup() throws Exception {
Iterator<ProductSender> iter = senders.iterator();
while (iter.hasNext()) {
iter.next().startup();
}
}
/**
* Send a product to all ProductSenders concurrently.
*
* @param senders the senders to receive product.
* @param product the product to send.
* @param timeoutSeconds number of seconds before timing out, interrupting any
* pending send.
* @return exceptions that occured while sending. If map is empty, there were no
* exceptions.
*/
public static Map<ProductSender, Exception> parallelSendProduct(final List<ProductSender> senders,
final Product product, final long timeoutSeconds) {
final Map<ProductSender, Boolean> sendComplete = Collections.synchronizedMap(new HashMap<ProductSender, Boolean>());
final Map<ProductSender, Exception> sendExceptions = Collections
.synchronizedMap(new HashMap<ProductSender, Exception>());
Iterator<ProductSender> iter = senders.iterator();
List<Callable<Void>> sendTasks = new ArrayList<Callable<Void>>();
while (iter.hasNext()) {
final ProductSender sender = iter.next();
sendComplete.put(sender, false);
sendTasks.add(() -> {
try {
sender.sendProduct(product);
sendComplete.put(sender, true);
} catch (Exception e) {
if (e instanceof ProductAlreadyInStorageException) {
LOGGER.info("Product already in storage, id=" + product.getId().toString());
} else {
LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
sendExceptions.put(sender, e);
}
}
return null;
});
}
// run in parallel
ExecutorService sendExecutor = Executors.newFixedThreadPool(senders.size());
try {
sendExecutor.invokeAll(sendTasks, timeoutSeconds, TimeUnit.SECONDS);
} catch (Exception e) {
// this may be Interupted, NullPointer, or RejectedExecution
// in any case, this part is done and move on to checking send status
}
sendExecutor.shutdown();
// check whether send completed or was interrupted
for (ProductSender sender : sendComplete.keySet()) {
if (!sendComplete.get(sender) && sendExceptions.get(sender) == null) {
sendExceptions.put(sender, new InterruptedException());
}
}
return sendExceptions;
}
}