ProductTracker.java
- /*
- * ProductTracker
- */
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.util.Config;
- import gov.usgs.util.StreamUtils;
- import gov.usgs.util.XmlUtils;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.URL;
- import java.net.URLConnection;
- import java.net.URLEncoder;
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Send updates and search sent updates about distribution status.
- *
- * ProductDistribution clients to send status updates about received
- * notifications, and processed products.
- *
- * <strong>Search Example</strong>
- *
- * <pre>
- * ProductTracker tracker = new ProductTracker(new URL(
- * "http://ehppdl1.cr.usgs.gov/tracker/"));
- * String source = "us";
- * String type = "losspager";
- * String code = "us2010abcd";
- * Date updateTime = null;
- * String className = null;
- * List<ProductTrackerUpdate> updates = tracker.getUpdates(source, type, code,
- * updateTime, className);
- * </pre>
- *
- * <strong>Update Example</strong>
- *
- * <pre>
- * Product product = ...;
- * ProductTracker tracker = new ProductTracker(product.getTrackerURL()).;
- * ProductTrackerUpdate update = new ProductTrackerUpdate(product.getTrackerURL(),
- * product.getId(),
- * "my component name",
- * "my component message");
- * tracker.sendUpdate(update);
- * </pre>
- *
- */
- public class ProductTracker {
- /** Logging object. */
- private static final Logger LOGGER = Logger.getLogger(ProductTracker.class
- .getName());
- /** Whether tracker updates are enabled in this vm. */
- private static boolean TRACKER_ENABLED = false;
- /**
- * Set whether sending tracker updates is enabled from this host.
- *
- * @param enabled
- * true to send tracker updates, false to disable.
- */
- public static void setTrackerEnabled(final boolean enabled) {
- TRACKER_ENABLED = enabled;
- }
- /** Location of tracker. */
- private URL trackerURL;
- /**
- * Create a new ProductTracker object.
- * @param trackerURL location of tracker
- */
- public ProductTracker(final URL trackerURL) {
- this.trackerURL = trackerURL;
- }
- /**
- * @return the trackerURL
- */
- public URL getTrackerURL() {
- return trackerURL;
- }
- /**
- * @param trackerURL
- * the trackerURL to set
- */
- public void setTrackerURL(URL trackerURL) {
- this.trackerURL = trackerURL;
- }
- /**
- * Send an update to this ProductTracker.
- *
- * @param update
- * the update to send to the tracker.
- * @return the update object processed by the tracker, including sequence
- * number, or null if unable to send.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate sendUpdate(final ProductTrackerUpdate update)
- throws Exception {
- HeartbeatListener.sendHeartbeatMessage(update.getClassName(), // component
- update.getMessage(), // key
- update.getId().toString() // value
- );
- String response = sendUpdateXML(update);
- try {
- List<ProductTrackerUpdate> updates = parseTrackerResponse(
- update.getTrackerURL(),
- StreamUtils.getInputStream(response));
- // should receive one update (the one that was sent)
- if (updates.size() == 1) {
- return updates.get(0);
- }
- } catch (Exception e) {
- // ignore
- }
- return null;
- }
- /**
- * Send an update to this ProductTracker.
- *
- * @param update
- * the update to send to the tracker.
- * @return the raw XML returned by the tracker, or null if unable to send.
- * @throws Exception if error occurs
- */
- public String sendUpdateXML(final ProductTrackerUpdate update)
- throws Exception {
- // make sure this update hasn't already been sent
- Long sequenceNumber = update.getSequenceNumber();
- if (sequenceNumber != null && sequenceNumber > 0) {
- throw new IllegalArgumentException(
- "ProductTrackerUpdate already has a sequence number.");
- }
- ProductId id = update.getId();
- // log the update no matter what
- LOGGER.fine(new StringBuffer().append(update.getMessage())
- .append(" source=").append(id.getSource()).append(", type=")
- .append(id.getType()).append(", code=").append(id.getCode())
- .append(", updateTime=").append(id.getUpdateTime().toString())
- .toString());
- if (!TRACKER_ENABLED) {
- LOGGER.finest("Tracker updates disabled, not sent");
- // didn't send update
- return null;
- }
- // build update request
- Map<String, String> request = new HashMap<String, String>();
- request.put("action", "update");
- request.put("source", id.getSource());
- request.put("type", id.getType());
- request.put("code", id.getCode());
- request.put("updateTime", Long.toString(id.getUpdateTime().getTime()));
- request.put("className", update.getClassName());
- request.put("message", update.getMessage());
- try {
- String response = post(update.getTrackerURL(), request);
- return response;
- } catch (Exception e) {
- LOGGER.log(Level.INFO, "Unable to post to tracker", e);
- }
- return null;
- }
- /**
- * Same as getUpdates with 0 for startid
- * @param source
- * product source.
- * @param type
- * product type.
- * @param code
- * product code.
- * @param updateTime
- * product update time.
- * @param className
- * module name.
- * @return updates matching the provided fields.
- * @throws Exception if error occurs
- */
- public List<ProductTrackerUpdate> getUpdates(final String source,
- final String type, final String code, final Date updateTime,
- final String className) throws Exception {
- return getUpdates(source, type, code, updateTime, className, 0L);
- }
- /**
- * Search for updates on this tracker.
- *
- * At least one field must be not null, or this method will return no
- * updates.
- *
- * @param source
- * product source.
- * @param type
- * product type.
- * @param code
- * product code.
- * @param updateTime
- * product update time.
- * @param className
- * module name.
- * @param startid
- * starting ID
- * @return updates matching the provided fields.
- * @throws Exception if error occurs
- */
- public List<ProductTrackerUpdate> getUpdates(final String source,
- final String type, final String code, final Date updateTime,
- final String className, final Long startid) throws Exception {
- String response = getUpdateXML(source, type, code, updateTime,
- className, startid);
- List<ProductTrackerUpdate> updates = parseTrackerResponse(trackerURL,
- StreamUtils.getInputStream(response));
- return updates;
- }
- /**
- * Search for updates on this tracker, returning raw xml.
- *
- * @param source
- * product source.
- * @param type
- * product type.
- * @param code
- * product code.
- * @param updateTime
- * product update time.
- * @param className
- * module name.
- * @param startid
- * start id
- * @return the raw xml response from the tracker.
- * @throws Exception if error occurs
- */
- public String getUpdateXML(final String source, final String type,
- final String code, final Date updateTime, final String className,
- final Long startid) throws Exception {
- Map<String, String> request = new HashMap<String, String>();
- request.put("action", "search");
- if (source != null) {
- request.put("source", source);
- }
- if (type != null) {
- request.put("type", type);
- }
- if (code != null) {
- request.put("code", code);
- }
- if (updateTime != null) {
- request.put("updateTime", Long.toString(updateTime.getTime()));
- }
- if (className != null) {
- request.put("className", className);
- }
- if (startid != null) {
- request.put("startid", Long.toString(startid));
- }
- String response = post(trackerURL, request);
- return response;
- }
- /**
- * Send a custom tracker update message.
- *
- * @param className
- * the module that is sending the message.
- * @param id
- * the product the message is about.
- * @param message
- * the message about the product.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate sendUpdate(final String className,
- final ProductId id, final String message) throws Exception {
- ProductTrackerUpdate update = new ProductTrackerUpdate(trackerURL, id,
- className, message);
- return sendUpdate(update);
- }
- /**
- * Send a productCreated update.
- *
- * @param className
- * the module that created the product.
- * @param id
- * the product that was created.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate productCreated(final String className,
- final ProductId id) throws Exception {
- ProductTrackerUpdate createdUpdate = new ProductTrackerUpdate(
- trackerURL, id, className, ProductTrackerUpdate.PRODUCT_CREATED);
- return sendUpdate(createdUpdate);
- }
- /**
- * Send a productIndexed update.
- *
- * @param className
- * the module that indexed the product.
- * @param id
- * the product that was indexed.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate productIndexed(final String className,
- final ProductId id) throws Exception {
- ProductTrackerUpdate indexedUpdate = new ProductTrackerUpdate(
- trackerURL, id, className, ProductTrackerUpdate.PRODUCT_INDEXED);
- return sendUpdate(indexedUpdate);
- }
- /**
- * Send a notificationSent update.
- *
- * @param className
- * the module that sent the notification.
- * @param notification
- * the notification that was sent.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate notificationSent(final String className,
- final Notification notification) throws Exception {
- ProductTrackerUpdate notifiedUpdate = new ProductTrackerUpdate(
- trackerURL, notification.getProductId(), className,
- ProductTrackerUpdate.NOTIFICATION_SENT);
- return sendUpdate(notifiedUpdate);
- }
- /**
- * Send a notificationReceived update.
- *
- * @param className
- * the module that received the notification.
- * @param notification
- * the notification that was received.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate notificationReceived(final String className,
- final Notification notification) throws Exception {
- ProductTrackerUpdate notifiedUpdate = new ProductTrackerUpdate(
- trackerURL, notification.getProductId(), className,
- ProductTrackerUpdate.NOTIFICATION_RECEIVED);
- return sendUpdate(notifiedUpdate);
- }
- /**
- * Send a productDownloaded update.
- *
- * @param className
- * the module that downloaded the product.
- * @param id
- * the product that was downloaded.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate productDownloaded(final String className,
- final ProductId id) throws Exception {
- ProductTrackerUpdate downloadedUpdate = new ProductTrackerUpdate(
- trackerURL, id, className,
- ProductTrackerUpdate.PRODUCT_DOWNLOADED);
- return sendUpdate(downloadedUpdate);
- }
- /**
- * Send a productReceived update.
- *
- * @param className
- * the module that received the product.
- * @param id
- * the product that was received.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate productReceived(final String className,
- final ProductId id) throws Exception {
- ProductTrackerUpdate receivedUpdate = new ProductTrackerUpdate(
- trackerURL, id, className,
- ProductTrackerUpdate.PRODUCT_RECEIVED);
- return sendUpdate(receivedUpdate);
- }
- /**
- * Send an exception update.
- *
- * @param className
- * the module that encountered an exception.
- * @param id
- * the product that was being processed.
- * @param e
- * the exception that was caught.
- * @return the sent update.
- * @throws Exception if error occurs
- */
- public ProductTrackerUpdate exception(final String className,
- final ProductId id, final Exception e) throws Exception {
- ProductTrackerUpdate exceptionUpdate = new ProductTrackerUpdate(
- trackerURL, id, className,
- ProductTrackerUpdate.PRODUCT_EXCEPTION + ": " + e.getMessage());
- return sendUpdate(exceptionUpdate);
- }
- /**
- * Encode data for a HTTP Post.
- *
- * @param data
- * a map containing name value pairs for encoding.
- * @return a string of encoded data.
- * @throws Exception if error occurs
- */
- public static String encodeURLData(final Map<String, String> data)
- throws Exception {
- StringBuffer buf = new StringBuffer();
- Iterator<String> iter = data.keySet().iterator();
- while (iter.hasNext()) {
- String key = iter.next();
- buf.append(URLEncoder.encode(key, StandardCharsets.UTF_8.toString()))
- .append("=")
- .append(URLEncoder.encode(data.get(key), StandardCharsets.UTF_8.toString()));
- if (iter.hasNext()) {
- buf.append("&");
- }
- }
- return buf.toString();
- }
- /**
- * Execute a HTTP Post.
- *
- * @param url
- * the target url.
- * @param data
- * the data to send.
- * @return the response text.
- * @throws Exception if error occurs
- */
- public static String post(final URL url, final Map<String, String> data)
- throws Exception {
- InputStream in = null;
- OutputStream out = null;
- String response = null;
- try {
- URLConnection connection = url.openConnection();
- if (data != null) {
- String encodedData = encodeURLData(data);
- connection.setDoOutput(true);
- out = connection.getOutputStream();
- StreamUtils.transferStream(encodedData, out);
- }
- in = connection.getInputStream();
- response = new String(StreamUtils.readStream(in));
- } finally {
- StreamUtils.closeStream(out);
- StreamUtils.closeStream(in);
- }
- return response;
- }
- /**
- * Parse xml received from a ProductTracker using a ProductTrackerParser.
- *
- * @param trackerURL
- * the trackerURL being parsed (so updates are flagged as from
- * this tracker).
- * @param updateStream
- * the XML response stream from a product tracker.
- * @return a list of parsed updates.
- */
- public List<ProductTrackerUpdate> parseTrackerResponse(
- final URL trackerURL, final InputStream updateStream) {
- ProductTrackerParser parser = new ProductTrackerParser(trackerURL);
- parser.parse(updateStream);
- return parser.getUpdates();
- }
- /** Search a product tracker. */
- public static final String ACTION_SEARCH = "--search";
- /** Send an update to a product tracker. */
- public static final String ACTION_UPDATE = "--update";
- /** Used when searching or sending an update to a product tracker. */
- public static final String ARGUMENT_CLASSNAME = "--class=";
- /** Used when searching or sending an update to a product tracker. */
- public static final String ARGUMENT_PRODUCT_ID = "--productid=";
- /** Used when searching. */
- public static final String ARGUMENT_START_ID = "--startid=";
- /**
- * Command Line Interface to ProductTracker.
- *
- * @param args CLI arguments
- * @throws Exception if error occurs
- */
- public static void main(final String[] args) throws Exception {
- // whether we are sending an update (true)
- boolean isUpdate = false;
- // whether we are searching (true)
- boolean isSearch = false;
- // The update message being sent to the tracker (updates only)
- String updateMessage = null;
- // The tracker to communicate with
- URL trackerURL = null;
- // The module that sent an update
- String className = null;
- // The product id, an alternate to source+type+code+updateTime
- ProductId id = null;
- // The product source
- String source = null;
- // The product type
- String type = null;
- // The product code
- String code = null;
- // The product update time
- Date updateTime = null;
- // The starting sequence number (searches only)
- Long startid = 0L;
- // use the default tracker url unless overridden by arguments
- String defaultTrackerURL = Config.getConfig().getProperty(
- CLIProductBuilder.TRACKER_URL_CONFIG_PROPERTY);
- if (defaultTrackerURL != null) {
- trackerURL = new URL(defaultTrackerURL);
- }
- // parse the arguments
- for (String arg : args) {
- if (arg.equals(ACTION_UPDATE)) {
- isUpdate = true;
- } else if (arg.equals(ACTION_SEARCH)) {
- isSearch = true;
- } else if (arg.startsWith(ARGUMENT_CLASSNAME)) {
- className = arg.replace(ARGUMENT_CLASSNAME, "");
- } else if (arg.startsWith(ARGUMENT_PRODUCT_ID)) {
- id = ProductId.parse(arg.replace(ARGUMENT_PRODUCT_ID, ""));
- source = id.getSource();
- type = id.getType();
- code = id.getCode();
- updateTime = id.getUpdateTime();
- } else if (arg.startsWith(CLIProductBuilder.TRACKER_URL_ARGUMENT)) {
- trackerURL = new URL(arg.replace(
- CLIProductBuilder.TRACKER_URL_ARGUMENT, ""));
- } else if (arg.startsWith(CLIProductBuilder.SOURCE_ARGUMENT)) {
- source = arg.replace(CLIProductBuilder.SOURCE_ARGUMENT, "");
- } else if (arg.startsWith(CLIProductBuilder.TYPE_ARGUMENT)) {
- type = arg.replace(CLIProductBuilder.TYPE_ARGUMENT, "");
- } else if (arg.startsWith(CLIProductBuilder.CODE_ARGUMENT)) {
- code = arg.replace(CLIProductBuilder.CODE_ARGUMENT, "");
- } else if (arg.startsWith(CLIProductBuilder.UPDATE_TIME_ARGUMENT)) {
- updateTime = XmlUtils.getDate(arg.replace(
- CLIProductBuilder.UPDATE_TIME_ARGUMENT, ""));
- } else if (arg.startsWith(ARGUMENT_START_ID)) {
- startid = Long.valueOf(arg.replace(ARGUMENT_START_ID, ""));
- }
- }
- // run the search or update
- ProductTracker tracker = new ProductTracker(trackerURL);
- if (isUpdate && isSearch) {
- LOGGER.severe("Both " + ACTION_UPDATE + " and " + ACTION_SEARCH
- + " present, only one allowed");
- System.exit(1);
- } else if (!isUpdate && !isSearch) {
- LOGGER.severe("Neither " + ACTION_UPDATE + " nor " + ACTION_SEARCH
- + " present, one is required");
- System.exit(1);
- } else if (isUpdate) {
- LOGGER.info("Reading update message from STDIN...");
- updateMessage = new String(StreamUtils.readStream(System.in));
- if (id == null) {
- id = new ProductId(source, type, code, updateTime);
- }
- System.out.println(tracker.sendUpdateXML(new ProductTrackerUpdate(
- trackerURL, id, className, updateMessage)));
- // LOGGER.info("Update id=" + update.getSequenceNumber());
- } else if (isSearch) {
- // default to search action
- /*
- * LOGGER.info("Searching for source=" + source + ", type=" + type +
- * ", code=" + code + ", updateTime=" + updateTime + ", className="
- * + className + ", startid=" + startid);
- */
- System.out.println(tracker.getUpdateXML(source, type, code,
- updateTime, className, startid));
- }
- }
- /** Usage for ProductTracker
- * @return CLI usage
- */
- public static String getUsage() {
- StringBuffer buf = new StringBuffer();
- buf.append("[--search] search a tracker (default)\n");
- buf.append(" when searching, include at least one parameter\n");
- buf.append("[--update] send an update to a tracker\n");
- buf.append(" when updating, productid and class are required\n");
- buf.append(" the update message is read from STDIN\n");
- buf.append("\n");
- buf.append("--trackerURL=URL which tracker to search or update\n");
- buf.append("[--productid=URN] the product to search or update\n");
- buf.append("[--source=SOURCE] a product source to search\n");
- buf.append("[--type=TYPE] a product type to search\n");
- buf.append("[--code=CODE] a product code to search\n");
- buf.append("[--updateTime=TIME] a product update time to search\n");
- buf.append("[--class=MODULE] a module to search or send an update\n");
- buf.append("[--startid=SEQ] only return updates with sequence number > SEQ\n");
- buf.append("\n");
- return buf.toString();
- }
- }