ProductTracker.java

  1. /*
  2.  * ProductTracker
  3.  */
  4. package gov.usgs.earthquake.distribution;

  5. import gov.usgs.earthquake.product.ProductId;
  6. import gov.usgs.util.Config;
  7. import gov.usgs.util.StreamUtils;
  8. import gov.usgs.util.XmlUtils;

  9. import java.io.InputStream;
  10. import java.io.OutputStream;
  11. import java.net.URL;
  12. import java.net.URLConnection;
  13. import java.net.URLEncoder;
  14. import java.nio.charset.StandardCharsets;
  15. import java.util.Date;
  16. import java.util.HashMap;
  17. import java.util.Iterator;
  18. import java.util.List;
  19. import java.util.Map;

  20. import java.util.logging.Level;
  21. import java.util.logging.Logger;

  22. /**
  23.  * Send updates and search sent updates about distribution status.
  24.  *
  25.  * ProductDistribution clients to send status updates about received
  26.  * notifications, and processed products.
  27.  *
  28.  * <strong>Search Example</strong>
  29.  *
  30.  * <pre>
  31.  * ProductTracker tracker = new ProductTracker(new URL(
  32.  *      &quot;http://ehppdl1.cr.usgs.gov/tracker/&quot;));
  33.  * String source = &quot;us&quot;;
  34.  * String type = &quot;losspager&quot;;
  35.  * String code = &quot;us2010abcd&quot;;
  36.  * Date updateTime = null;
  37.  * String className = null;
  38.  * List&lt;ProductTrackerUpdate&gt; updates = tracker.getUpdates(source, type, code,
  39.  *      updateTime, className);
  40.  * </pre>
  41.  *
  42.  * <strong>Update Example</strong>
  43.  *
  44.  * <pre>
  45.  * Product product = ...;
  46.  * ProductTracker tracker = new ProductTracker(product.getTrackerURL()).;
  47.  * ProductTrackerUpdate update = new ProductTrackerUpdate(product.getTrackerURL(),
  48.  *      product.getId(),
  49.  *      "my component name",
  50.  *      "my component message");
  51.  * tracker.sendUpdate(update);
  52.  * </pre>
  53.  *
  54.  */
  55. public class ProductTracker {

  56.     /** Logging object. */
  57.     private static final Logger LOGGER = Logger.getLogger(ProductTracker.class
  58.             .getName());

  59.     /** Whether tracker updates are enabled in this vm. */
  60.     private static boolean TRACKER_ENABLED = false;

  61.     /**
  62.      * Set whether sending tracker updates is enabled from this host.
  63.      *
  64.      * @param enabled
  65.      *            true to send tracker updates, false to disable.
  66.      */
  67.     public static void setTrackerEnabled(final boolean enabled) {
  68.         TRACKER_ENABLED = enabled;
  69.     }

  70.     /** Location of tracker. */
  71.     private URL trackerURL;

  72.     /**
  73.      * Create a new ProductTracker object.
  74.      * @param trackerURL location of tracker
  75.      */
  76.     public ProductTracker(final URL trackerURL) {
  77.         this.trackerURL = trackerURL;
  78.     }

  79.     /**
  80.      * @return the trackerURL
  81.      */
  82.     public URL getTrackerURL() {
  83.         return trackerURL;
  84.     }

  85.     /**
  86.      * @param trackerURL
  87.      *            the trackerURL to set
  88.      */
  89.     public void setTrackerURL(URL trackerURL) {
  90.         this.trackerURL = trackerURL;
  91.     }

  92.     /**
  93.      * Send an update to this ProductTracker.
  94.      *
  95.      * @param update
  96.      *            the update to send to the tracker.
  97.      * @return the update object processed by the tracker, including sequence
  98.      *         number, or null if unable to send.
  99.      * @throws Exception if error occurs
  100.      */
  101.     public ProductTrackerUpdate sendUpdate(final ProductTrackerUpdate update)
  102.             throws Exception {

  103.         HeartbeatListener.sendHeartbeatMessage(update.getClassName(), // component
  104.                 update.getMessage(), // key
  105.                 update.getId().toString() // value
  106.                 );
  107.         String response = sendUpdateXML(update);
  108.         try {
  109.             List<ProductTrackerUpdate> updates = parseTrackerResponse(
  110.                     update.getTrackerURL(),
  111.                     StreamUtils.getInputStream(response));
  112.             // should receive one update (the one that was sent)
  113.             if (updates.size() == 1) {
  114.                 return updates.get(0);
  115.             }
  116.         } catch (Exception e) {
  117.             // ignore
  118.         }
  119.         return null;
  120.     }

  121.     /**
  122.      * Send an update to this ProductTracker.
  123.      *
  124.      * @param update
  125.      *            the update to send to the tracker.
  126.      * @return the raw XML returned by the tracker, or null if unable to send.
  127.      * @throws Exception if error occurs
  128.      */
  129.     public String sendUpdateXML(final ProductTrackerUpdate update)
  130.             throws Exception {

  131.         // make sure this update hasn't already been sent
  132.         Long sequenceNumber = update.getSequenceNumber();
  133.         if (sequenceNumber != null && sequenceNumber > 0) {
  134.             throw new IllegalArgumentException(
  135.                     "ProductTrackerUpdate already has a sequence number.");
  136.         }

  137.         ProductId id = update.getId();

  138.         // log the update no matter what
  139.         LOGGER.fine(new StringBuffer().append(update.getMessage())
  140.                 .append(" source=").append(id.getSource()).append(", type=")
  141.                 .append(id.getType()).append(", code=").append(id.getCode())
  142.                 .append(", updateTime=").append(id.getUpdateTime().toString())
  143.                 .toString());

  144.         if (!TRACKER_ENABLED) {
  145.             LOGGER.finest("Tracker updates disabled, not sent");
  146.             // didn't send update
  147.             return null;
  148.         }

  149.         // build update request
  150.         Map<String, String> request = new HashMap<String, String>();
  151.         request.put("action", "update");
  152.         request.put("source", id.getSource());
  153.         request.put("type", id.getType());
  154.         request.put("code", id.getCode());
  155.         request.put("updateTime", Long.toString(id.getUpdateTime().getTime()));
  156.         request.put("className", update.getClassName());
  157.         request.put("message", update.getMessage());

  158.         try {
  159.             String response = post(update.getTrackerURL(), request);
  160.             return response;
  161.         } catch (Exception e) {
  162.             LOGGER.log(Level.INFO, "Unable to post to tracker", e);
  163.         }
  164.         return null;
  165.     }

  166.     /**
  167.      * Same as getUpdates with 0 for startid
  168.      * @param source
  169.      *            product source.
  170.      * @param type
  171.      *            product type.
  172.      * @param code
  173.      *            product code.
  174.      * @param updateTime
  175.      *            product update time.
  176.      * @param className
  177.      *            module name.
  178.      * @return updates matching the provided fields.
  179.      * @throws Exception if error occurs
  180.      */
  181.     public List<ProductTrackerUpdate> getUpdates(final String source,
  182.             final String type, final String code, final Date updateTime,
  183.             final String className) throws Exception {
  184.         return getUpdates(source, type, code, updateTime, className, 0L);
  185.     }

  186.     /**
  187.      * Search for updates on this tracker.
  188.      *
  189.      * At least one field must be not null, or this method will return no
  190.      * updates.
  191.      *
  192.      * @param source
  193.      *            product source.
  194.      * @param type
  195.      *            product type.
  196.      * @param code
  197.      *            product code.
  198.      * @param updateTime
  199.      *            product update time.
  200.      * @param className
  201.      *            module name.
  202.      * @param startid
  203.      *            starting ID
  204.      * @return updates matching the provided fields.
  205.      * @throws Exception if error occurs
  206.      */
  207.     public List<ProductTrackerUpdate> getUpdates(final String source,
  208.             final String type, final String code, final Date updateTime,
  209.             final String className, final Long startid) throws Exception {
  210.         String response = getUpdateXML(source, type, code, updateTime,
  211.                 className, startid);
  212.         List<ProductTrackerUpdate> updates = parseTrackerResponse(trackerURL,
  213.                 StreamUtils.getInputStream(response));
  214.         return updates;
  215.     }

  216.     /**
  217.      * Search for updates on this tracker, returning raw xml.
  218.      *
  219.      * @param source
  220.      *            product source.
  221.      * @param type
  222.      *            product type.
  223.      * @param code
  224.      *            product code.
  225.      * @param updateTime
  226.      *            product update time.
  227.      * @param className
  228.      *            module name.
  229.      * @param startid
  230.      *            start id
  231.      * @return the raw xml response from the tracker.
  232.      * @throws Exception if error occurs
  233.      */
  234.     public String getUpdateXML(final String source, final String type,
  235.             final String code, final Date updateTime, final String className,
  236.             final Long startid) throws Exception {

  237.         Map<String, String> request = new HashMap<String, String>();
  238.         request.put("action", "search");

  239.         if (source != null) {
  240.             request.put("source", source);
  241.         }
  242.         if (type != null) {
  243.             request.put("type", type);
  244.         }
  245.         if (code != null) {
  246.             request.put("code", code);
  247.         }
  248.         if (updateTime != null) {
  249.             request.put("updateTime", Long.toString(updateTime.getTime()));
  250.         }

  251.         if (className != null) {
  252.             request.put("className", className);
  253.         }

  254.         if (startid != null) {
  255.             request.put("startid", Long.toString(startid));
  256.         }

  257.         String response = post(trackerURL, request);
  258.         return response;
  259.     }

  260.     /**
  261.      * Send a custom tracker update message.
  262.      *
  263.      * @param className
  264.      *            the module that is sending the message.
  265.      * @param id
  266.      *            the product the message is about.
  267.      * @param message
  268.      *            the message about the product.
  269.      * @return the sent update.
  270.      * @throws Exception if error occurs
  271.      */
  272.     public ProductTrackerUpdate sendUpdate(final String className,
  273.             final ProductId id, final String message) throws Exception {
  274.         ProductTrackerUpdate update = new ProductTrackerUpdate(trackerURL, id,
  275.                 className, message);
  276.         return sendUpdate(update);
  277.     }

  278.     /**
  279.      * Send a productCreated update.
  280.      *
  281.      * @param className
  282.      *            the module that created the product.
  283.      * @param id
  284.      *            the product that was created.
  285.      * @return the sent update.
  286.      * @throws Exception if error occurs
  287.      */
  288.     public ProductTrackerUpdate productCreated(final String className,
  289.             final ProductId id) throws Exception {
  290.         ProductTrackerUpdate createdUpdate = new ProductTrackerUpdate(
  291.                 trackerURL, id, className, ProductTrackerUpdate.PRODUCT_CREATED);
  292.         return sendUpdate(createdUpdate);
  293.     }

  294.     /**
  295.      * Send a productIndexed update.
  296.      *
  297.      * @param className
  298.      *            the module that indexed the product.
  299.      * @param id
  300.      *            the product that was indexed.
  301.      * @return the sent update.
  302.      * @throws Exception if error occurs
  303.      */
  304.     public ProductTrackerUpdate productIndexed(final String className,
  305.             final ProductId id) throws Exception {
  306.         ProductTrackerUpdate indexedUpdate = new ProductTrackerUpdate(
  307.                 trackerURL, id, className, ProductTrackerUpdate.PRODUCT_INDEXED);
  308.         return sendUpdate(indexedUpdate);
  309.     }

  310.     /**
  311.      * Send a notificationSent update.
  312.      *
  313.      * @param className
  314.      *            the module that sent the notification.
  315.      * @param notification
  316.      *            the notification that was sent.
  317.      * @return the sent update.
  318.      * @throws Exception if error occurs
  319.      */
  320.     public ProductTrackerUpdate notificationSent(final String className,
  321.             final Notification notification) throws Exception {
  322.         ProductTrackerUpdate notifiedUpdate = new ProductTrackerUpdate(
  323.                 trackerURL, notification.getProductId(), className,
  324.                 ProductTrackerUpdate.NOTIFICATION_SENT);
  325.         return sendUpdate(notifiedUpdate);
  326.     }

  327.     /**
  328.      * Send a notificationReceived update.
  329.      *
  330.      * @param className
  331.      *            the module that received the notification.
  332.      * @param notification
  333.      *            the notification that was received.
  334.      * @return the sent update.
  335.      * @throws Exception if error occurs
  336.      */
  337.     public ProductTrackerUpdate notificationReceived(final String className,
  338.             final Notification notification) throws Exception {
  339.         ProductTrackerUpdate notifiedUpdate = new ProductTrackerUpdate(
  340.                 trackerURL, notification.getProductId(), className,
  341.                 ProductTrackerUpdate.NOTIFICATION_RECEIVED);
  342.         return sendUpdate(notifiedUpdate);
  343.     }

  344.     /**
  345.      * Send a productDownloaded update.
  346.      *
  347.      * @param className
  348.      *            the module that downloaded the product.
  349.      * @param id
  350.      *            the product that was downloaded.
  351.      * @return the sent update.
  352.      * @throws Exception if error occurs
  353.      */
  354.     public ProductTrackerUpdate productDownloaded(final String className,
  355.             final ProductId id) throws Exception {
  356.         ProductTrackerUpdate downloadedUpdate = new ProductTrackerUpdate(
  357.                 trackerURL, id, className,
  358.                 ProductTrackerUpdate.PRODUCT_DOWNLOADED);
  359.         return sendUpdate(downloadedUpdate);
  360.     }

  361.     /**
  362.      * Send a productReceived update.
  363.      *
  364.      * @param className
  365.      *            the module that received the product.
  366.      * @param id
  367.      *            the product that was received.
  368.      * @return the sent update.
  369.      * @throws Exception if error occurs
  370.      */
  371.     public ProductTrackerUpdate productReceived(final String className,
  372.             final ProductId id) throws Exception {
  373.         ProductTrackerUpdate receivedUpdate = new ProductTrackerUpdate(
  374.                 trackerURL, id, className,
  375.                 ProductTrackerUpdate.PRODUCT_RECEIVED);
  376.         return sendUpdate(receivedUpdate);
  377.     }

  378.     /**
  379.      * Send an exception update.
  380.      *
  381.      * @param className
  382.      *            the module that encountered an exception.
  383.      * @param id
  384.      *            the product that was being processed.
  385.      * @param e
  386.      *            the exception that was caught.
  387.      * @return the sent update.
  388.      * @throws Exception if error occurs
  389.      */
  390.     public ProductTrackerUpdate exception(final String className,
  391.             final ProductId id, final Exception e) throws Exception {
  392.         ProductTrackerUpdate exceptionUpdate = new ProductTrackerUpdate(
  393.                 trackerURL, id, className,
  394.                 ProductTrackerUpdate.PRODUCT_EXCEPTION + ": " + e.getMessage());
  395.         return sendUpdate(exceptionUpdate);
  396.     }

  397.     /**
  398.      * Encode data for a HTTP Post.
  399.      *
  400.      * @param data
  401.      *            a map containing name value pairs for encoding.
  402.      * @return a string of encoded data.
  403.      * @throws Exception if error occurs
  404.      */
  405.     public static String encodeURLData(final Map<String, String> data)
  406.             throws Exception {
  407.         StringBuffer buf = new StringBuffer();
  408.         Iterator<String> iter = data.keySet().iterator();
  409.         while (iter.hasNext()) {
  410.             String key = iter.next();
  411.             buf.append(URLEncoder.encode(key, StandardCharsets.UTF_8.toString()))
  412.                     .append("=")
  413.                     .append(URLEncoder.encode(data.get(key), StandardCharsets.UTF_8.toString()));
  414.             if (iter.hasNext()) {
  415.                 buf.append("&");
  416.             }
  417.         }
  418.         return buf.toString();
  419.     }

  420.     /**
  421.      * Execute a HTTP Post.
  422.      *
  423.      * @param url
  424.      *            the target url.
  425.      * @param data
  426.      *            the data to send.
  427.      * @return the response text.
  428.      * @throws Exception if error occurs
  429.      */
  430.     public static String post(final URL url, final Map<String, String> data)
  431.             throws Exception {
  432.         InputStream in = null;
  433.         OutputStream out = null;
  434.         String response = null;

  435.         try {
  436.             URLConnection connection = url.openConnection();
  437.             if (data != null) {
  438.                 String encodedData = encodeURLData(data);
  439.                 connection.setDoOutput(true);
  440.                 out = connection.getOutputStream();
  441.                 StreamUtils.transferStream(encodedData, out);
  442.             }

  443.             in = connection.getInputStream();
  444.             response = new String(StreamUtils.readStream(in));
  445.         } finally {
  446.             StreamUtils.closeStream(out);
  447.             StreamUtils.closeStream(in);
  448.         }

  449.         return response;
  450.     }

  451.     /**
  452.      * Parse xml received from a ProductTracker using a ProductTrackerParser.
  453.      *
  454.      * @param trackerURL
  455.      *            the trackerURL being parsed (so updates are flagged as from
  456.      *            this tracker).
  457.      * @param updateStream
  458.      *            the XML response stream from a product tracker.
  459.      * @return a list of parsed updates.
  460.      */
  461.     public List<ProductTrackerUpdate> parseTrackerResponse(
  462.             final URL trackerURL, final InputStream updateStream) {
  463.         ProductTrackerParser parser = new ProductTrackerParser(trackerURL);
  464.         parser.parse(updateStream);
  465.         return parser.getUpdates();
  466.     }

  467.     /** Search a product tracker. */
  468.     public static final String ACTION_SEARCH = "--search";
  469.     /** Send an update to a product tracker. */
  470.     public static final String ACTION_UPDATE = "--update";
  471.     /** Used when searching or sending an update to a product tracker. */
  472.     public static final String ARGUMENT_CLASSNAME = "--class=";
  473.     /** Used when searching or sending an update to a product tracker. */
  474.     public static final String ARGUMENT_PRODUCT_ID = "--productid=";
  475.     /** Used when searching. */
  476.     public static final String ARGUMENT_START_ID = "--startid=";

  477.     /**
  478.      * Command Line Interface to ProductTracker.
  479.      *
  480.      * @param args CLI arguments
  481.      * @throws Exception if error occurs
  482.      */
  483.     public static void main(final String[] args) throws Exception {
  484.         // whether we are sending an update (true)
  485.         boolean isUpdate = false;
  486.         // whether we are searching (true)
  487.         boolean isSearch = false;
  488.         // The update message being sent to the tracker (updates only)
  489.         String updateMessage = null;
  490.         // The tracker to communicate with
  491.         URL trackerURL = null;
  492.         // The module that sent an update
  493.         String className = null;
  494.         // The product id, an alternate to source+type+code+updateTime
  495.         ProductId id = null;
  496.         // The product source
  497.         String source = null;
  498.         // The product type
  499.         String type = null;
  500.         // The product code
  501.         String code = null;
  502.         // The product update time
  503.         Date updateTime = null;
  504.         // The starting sequence number (searches only)
  505.         Long startid = 0L;

  506.         // use the default tracker url unless overridden by arguments
  507.         String defaultTrackerURL = Config.getConfig().getProperty(
  508.                 CLIProductBuilder.TRACKER_URL_CONFIG_PROPERTY);
  509.         if (defaultTrackerURL != null) {
  510.             trackerURL = new URL(defaultTrackerURL);
  511.         }

  512.         // parse the arguments
  513.         for (String arg : args) {
  514.             if (arg.equals(ACTION_UPDATE)) {
  515.                 isUpdate = true;
  516.             } else if (arg.equals(ACTION_SEARCH)) {
  517.                 isSearch = true;
  518.             } else if (arg.startsWith(ARGUMENT_CLASSNAME)) {
  519.                 className = arg.replace(ARGUMENT_CLASSNAME, "");
  520.             } else if (arg.startsWith(ARGUMENT_PRODUCT_ID)) {
  521.                 id = ProductId.parse(arg.replace(ARGUMENT_PRODUCT_ID, ""));
  522.                 source = id.getSource();
  523.                 type = id.getType();
  524.                 code = id.getCode();
  525.                 updateTime = id.getUpdateTime();
  526.             } else if (arg.startsWith(CLIProductBuilder.TRACKER_URL_ARGUMENT)) {
  527.                 trackerURL = new URL(arg.replace(
  528.                         CLIProductBuilder.TRACKER_URL_ARGUMENT, ""));
  529.             } else if (arg.startsWith(CLIProductBuilder.SOURCE_ARGUMENT)) {
  530.                 source = arg.replace(CLIProductBuilder.SOURCE_ARGUMENT, "");
  531.             } else if (arg.startsWith(CLIProductBuilder.TYPE_ARGUMENT)) {
  532.                 type = arg.replace(CLIProductBuilder.TYPE_ARGUMENT, "");
  533.             } else if (arg.startsWith(CLIProductBuilder.CODE_ARGUMENT)) {
  534.                 code = arg.replace(CLIProductBuilder.CODE_ARGUMENT, "");
  535.             } else if (arg.startsWith(CLIProductBuilder.UPDATE_TIME_ARGUMENT)) {
  536.                 updateTime = XmlUtils.getDate(arg.replace(
  537.                         CLIProductBuilder.UPDATE_TIME_ARGUMENT, ""));
  538.             } else if (arg.startsWith(ARGUMENT_START_ID)) {
  539.                 startid = Long.valueOf(arg.replace(ARGUMENT_START_ID, ""));
  540.             }
  541.         }

  542.         // run the search or update
  543.         ProductTracker tracker = new ProductTracker(trackerURL);
  544.         if (isUpdate && isSearch) {
  545.             LOGGER.severe("Both " + ACTION_UPDATE + " and " + ACTION_SEARCH
  546.                     + " present, only one allowed");
  547.             System.exit(1);
  548.         } else if (!isUpdate && !isSearch) {
  549.             LOGGER.severe("Neither " + ACTION_UPDATE + " nor " + ACTION_SEARCH
  550.                     + " present, one is required");
  551.             System.exit(1);
  552.         } else if (isUpdate) {
  553.             LOGGER.info("Reading update message from STDIN...");
  554.             updateMessage = new String(StreamUtils.readStream(System.in));
  555.             if (id == null) {
  556.                 id = new ProductId(source, type, code, updateTime);
  557.             }
  558.             System.out.println(tracker.sendUpdateXML(new ProductTrackerUpdate(
  559.                     trackerURL, id, className, updateMessage)));
  560.             // LOGGER.info("Update id=" + update.getSequenceNumber());
  561.         } else if (isSearch) {
  562.             // default to search action

  563.             /*
  564.              * LOGGER.info("Searching for source=" + source + ", type=" + type +
  565.              * ", code=" + code + ", updateTime=" + updateTime + ", className="
  566.              * + className + ", startid=" + startid);
  567.              */
  568.             System.out.println(tracker.getUpdateXML(source, type, code,
  569.                     updateTime, className, startid));
  570.         }
  571.     }

  572.     /** Usage for ProductTracker
  573.      * @return CLI usage
  574.      */
  575.     public static String getUsage() {
  576.         StringBuffer buf = new StringBuffer();

  577.         buf.append("[--search]               search a tracker (default)\n");
  578.         buf.append("                         when searching, include at least one parameter\n");
  579.         buf.append("[--update]               send an update to a tracker\n");
  580.         buf.append("                         when updating, productid and class are required\n");
  581.         buf.append("                         the update message is read from STDIN\n");
  582.         buf.append("\n");

  583.         buf.append("--trackerURL=URL         which tracker to search or update\n");
  584.         buf.append("[--productid=URN]        the product to search or update\n");
  585.         buf.append("[--source=SOURCE]        a product source to search\n");
  586.         buf.append("[--type=TYPE]            a product type to search\n");
  587.         buf.append("[--code=CODE]            a product code to search\n");
  588.         buf.append("[--updateTime=TIME]      a product update time to search\n");
  589.         buf.append("[--class=MODULE]         a module to search or send an update\n");
  590.         buf.append("[--startid=SEQ]          only return updates with sequence number > SEQ\n");
  591.         buf.append("\n");

  592.         return buf.toString();
  593.     }

  594. }