HeartbeatListener.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.earthquake.product.Product;
  3. import gov.usgs.util.Config;

  4. import java.util.Date;
  5. import java.util.Map;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.io.File;
  8. import java.io.IOException;
  9. import java.lang.management.ManagementFactory;
  10. import java.lang.management.MemoryMXBean;
  11. import java.lang.management.MemoryUsage;
  12. import java.util.Iterator;
  13. import java.util.logging.Logger;

  14. import javax.json.Json;
  15. import javax.json.JsonObjectBuilder;


  16. /**
  17.  * Heartbeat Listener stores heartbeat messages and writes them to a heartbeat
  18.  * file when a product is received
  19.  *
  20.  * @author tene
  21.  *
  22.  */
  23. public class HeartbeatListener extends DefaultNotificationListener {

  24.     private static final Logger LOGGER = Logger
  25.             .getLogger(HeartbeatListener.class.getName());

  26.     /** Storage for heartbeat components */
  27.     private static Map<String, HeartbeatStatus> HASH_HEARTBEATS = new ConcurrentHashMap<>();

  28.     /** Configurable property for heartbeat fully qualified directory+filename. */
  29.     public static final String HEARTBEAT_FILENAME_PROPERTY = "heartbeatFilename";

  30.     /** Default heartbeat directory. */
  31.     public static final String DEFAULT_HEARTBEAT_FILENAME = "heartbeat.dat";

  32.     /** Default timeout for HeartbeatStatus key/value pairs. Zero = disabled */
  33.     public static final String DEFAULT_STORAGE_TIMEOUT = "0";

  34.     /**
  35.      * Default schedule interval for HeartbeatStatus key/value pairs cleanup. 30
  36.      * minutes
  37.      */
  38.     public static final String DEFAULT_CLEANUP_INTERVAL = "1800000";

  39.     /** Configurable property for heartbeat key/value expiration */
  40.     public static final String HEARTBEAT_TIMEOUT_PROPERTY = "heartbeatTimeout";

  41.     /** Flag listeners are listening */
  42.     private static boolean LISTENING = false;

  43.     /** Hearbeat registered file. */
  44.     private File heartbeatFile;

  45.     /** Timeout for expiration of key/value pairs */
  46.     private long storageTimeout;

  47.     /**
  48.      * Create a new HeartbeatListener.
  49.      *
  50.      * Sets up the includeTypes list to contain "heartbeat".
  51.      * @throws Exception if error occurs
  52.      */
  53.     public HeartbeatListener() throws Exception {
  54.         LISTENING = true;
  55.         heartbeatFile = new File(DEFAULT_HEARTBEAT_FILENAME);
  56.         storageTimeout = Long.valueOf(DEFAULT_STORAGE_TIMEOUT);
  57.         this.getIncludeTypes().add("heartbeat");
  58.     }

  59.     /**
  60.      * @return map of component and heartbeat status.
  61.      */
  62.     protected static Map<String, HeartbeatStatus> getHeartbeats() {
  63.         return HASH_HEARTBEATS;
  64.     }

  65.     /**
  66.      * heartbeat onProduct processing writes to heartbeat file
  67.      */
  68.     @Override
  69.     public void onProduct(final Product product) throws Exception {

  70.         // track product
  71.         sendHeartbeatMessage(this.getName(), "lastHeartbeat", product.getId()
  72.                 .toString());

  73.         // track current memory usage
  74.         MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
  75.         MemoryUsage heapMemory = memoryMXBean.getHeapMemoryUsage();
  76.         MemoryUsage nonHeapMemory = memoryMXBean.getNonHeapMemoryUsage();

  77.         long heapUsed = heapMemory.getUsed();
  78.         long nonHeapUsed = nonHeapMemory.getUsed();
  79.         long heapCommitted = heapMemory.getCommitted();
  80.         long nonHeapCommitted = nonHeapMemory.getCommitted();
  81.         long heapMax = heapMemory.getMax();
  82.         long nonHeapMax = nonHeapMemory.getMax();
  83.         long totalUsed = heapUsed + nonHeapUsed;
  84.         long totalCommitted = heapCommitted + nonHeapCommitted;
  85.         long totalMax = heapMax + nonHeapMax;

  86.         sendHeartbeatMessage(this.getName(), "totalUsed", Long
  87.                 .toString(totalUsed));
  88.         sendHeartbeatMessage(this.getName(), "totalCommitted", Long
  89.                 .toString(totalCommitted));
  90.         sendHeartbeatMessage(this.getName(), "totalMax", Long
  91.                 .toString(totalMax));

  92.         // write heartbeat information to file
  93.         this.writeHeartbeat();

  94.     }

  95.     /**
  96.      * Send heartbeat data to heartbeat listener
  97.      *
  98.      * @param component String component
  99.      * @param key Heartbeat key
  100.      * @param value Heartbeat value
  101.      */
  102.     public static void sendHeartbeatMessage(final String component,
  103.             final String key, final String value) {

  104.         if (!LISTENING) {
  105.             return;
  106.         }
  107.         HeartbeatStatus objHeartbeat;

  108.         String heartbeatKey = component;
  109.         if (heartbeatKey == null) {
  110.             heartbeatKey = "<null>";
  111.         }

  112.         // register this heartbeat in temporary storage
  113.         if (HASH_HEARTBEATS.containsKey(heartbeatKey)) {
  114.             objHeartbeat = HASH_HEARTBEATS.get(heartbeatKey);
  115.         } else {
  116.             objHeartbeat = new HeartbeatStatus();
  117.             HASH_HEARTBEATS.put(heartbeatKey, objHeartbeat);
  118.         }

  119.         // store the heartbeat key/value in temporary storage
  120.         objHeartbeat.updateStatus(key, value);
  121.     }

  122.     /**
  123.      * Write heartbeat data for all components to the heartbeat file
  124.      *
  125.      * @return true
  126.      * @throws IOException if IO error occurs
  127.      */
  128.     public boolean writeHeartbeat() throws IOException {
  129.         String tempFileName = heartbeatFile.getName() + "-temp";
  130.         File tempFile = new File(tempFileName);

  131.         gov.usgs.util.FileUtils.writeFileThenMove(tempFile, heartbeatFile, this
  132.                 .formatHeartbeatOutput().getBytes());

  133.         return true;
  134.     }

  135.     /**
  136.      * Self-configure HeartbeatListener object
  137.      */
  138.     @Override
  139.     public void configure(Config config) throws Exception {
  140.         // let default notification listener configure itself
  141.         super.configure(config);

  142.         heartbeatFile = new File(config.getProperty(
  143.                 HEARTBEAT_FILENAME_PROPERTY, DEFAULT_HEARTBEAT_FILENAME));
  144.         LOGGER.config("[" + getName() + "] heartbeat file = "
  145.                 + heartbeatFile.getCanonicalPath());

  146.         storageTimeout = Long.valueOf(config.getProperty(
  147.                 HEARTBEAT_TIMEOUT_PROPERTY, DEFAULT_STORAGE_TIMEOUT));
  148.         LOGGER.config("[" + getName() + "] heartbeat timeout = "
  149.                 + storageTimeout + "ms");
  150.     }

  151.     /**
  152.      * @return JSON-formatted output from the map of components and their values
  153.      */
  154.     public String formatHeartbeatOutput() {
  155.         JsonObjectBuilder builder = Json.createObjectBuilder();
  156.         for (String key : HASH_HEARTBEATS.keySet()) {
  157.             HeartbeatStatus status = HASH_HEARTBEATS.get(key);
  158.             builder.add(key, status == null ? null : status.toJsonObject());
  159.         }
  160.         return builder.build().toString();
  161.     }

  162.     /**
  163.      * purge heartbeat key/values older than storageTimeout, also purging empty
  164.      * heartbeats
  165.      */
  166.     @Override
  167.     public void cleanup() throws Exception {
  168.         super.cleanup();

  169.         if (this.storageTimeout == 0) {
  170.             return;
  171.         }

  172.         Map<String, HeartbeatStatus> hashHeartbeats = getHeartbeats();
  173.         HeartbeatStatus objHeartbeat;
  174.         String component;
  175.         Iterator<String> itComponents = hashHeartbeats.keySet().iterator();
  176.         Date purgeDate = new Date(new Date().getTime() - this.storageTimeout);

  177.         // iterate through map of components
  178.         while (itComponents.hasNext()) {
  179.             component = itComponents.next();

  180.             // purge old key/value entries within objHeartbeat
  181.             objHeartbeat = hashHeartbeats.get(component);
  182.             objHeartbeat.clearDataOlderThanDate(purgeDate);

  183.             // if purge has left a objHeartheat with no remaining key/value
  184.             // entries,
  185.             // remove the objHeartbeat (i.e. the component)
  186.             if (objHeartbeat.isEmpty()) {
  187.                 itComponents.remove();
  188.             }

  189.         } // END while more components

  190.     }

  191.     /** @return heartbeatFile */
  192.     public File getHeartbeatFile() {
  193.         return heartbeatFile;
  194.     }

  195.     /** @param heartbeatFile to set */
  196.     public void setHeartbeatFile(File heartbeatFile) {
  197.         this.heartbeatFile = heartbeatFile;
  198.     }

  199.     /** @return storageTimeout */
  200.     public long getStorageTimeout() {
  201.         return storageTimeout;
  202.     }

  203.     /** @param storageTimeout to set */
  204.     public void setStorageTimeout(long storageTimeout) {
  205.         this.storageTimeout = storageTimeout;
  206.     }

  207. }