HeartbeatListener.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.Product;
- import gov.usgs.util.Config;
- import java.util.Date;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.io.File;
- import java.io.IOException;
- import java.lang.management.ManagementFactory;
- import java.lang.management.MemoryMXBean;
- import java.lang.management.MemoryUsage;
- import java.util.Iterator;
- import java.util.logging.Logger;
- import javax.json.Json;
- import javax.json.JsonObjectBuilder;
- /**
- * Heartbeat Listener stores heartbeat messages and writes them to a heartbeat
- * file when a product is received
- *
- * @author tene
- *
- */
- public class HeartbeatListener extends DefaultNotificationListener {
- private static final Logger LOGGER = Logger
- .getLogger(HeartbeatListener.class.getName());
- /** Storage for heartbeat components */
- private static Map<String, HeartbeatStatus> HASH_HEARTBEATS = new ConcurrentHashMap<>();
- /** Configurable property for heartbeat fully qualified directory+filename. */
- public static final String HEARTBEAT_FILENAME_PROPERTY = "heartbeatFilename";
- /** Default heartbeat directory. */
- public static final String DEFAULT_HEARTBEAT_FILENAME = "heartbeat.dat";
- /** Default timeout for HeartbeatStatus key/value pairs. Zero = disabled */
- public static final String DEFAULT_STORAGE_TIMEOUT = "0";
- /**
- * Default schedule interval for HeartbeatStatus key/value pairs cleanup. 30
- * minutes
- */
- public static final String DEFAULT_CLEANUP_INTERVAL = "1800000";
- /** Configurable property for heartbeat key/value expiration */
- public static final String HEARTBEAT_TIMEOUT_PROPERTY = "heartbeatTimeout";
- /** Flag listeners are listening */
- private static boolean LISTENING = false;
- /** Hearbeat registered file. */
- private File heartbeatFile;
- /** Timeout for expiration of key/value pairs */
- private long storageTimeout;
- /**
- * Create a new HeartbeatListener.
- *
- * Sets up the includeTypes list to contain "heartbeat".
- * @throws Exception if error occurs
- */
- public HeartbeatListener() throws Exception {
- LISTENING = true;
- heartbeatFile = new File(DEFAULT_HEARTBEAT_FILENAME);
- storageTimeout = Long.valueOf(DEFAULT_STORAGE_TIMEOUT);
- this.getIncludeTypes().add("heartbeat");
- }
- /**
- * @return map of component and heartbeat status.
- */
- protected static Map<String, HeartbeatStatus> getHeartbeats() {
- return HASH_HEARTBEATS;
- }
- /**
- * heartbeat onProduct processing writes to heartbeat file
- */
- @Override
- public void onProduct(final Product product) throws Exception {
- // track product
- sendHeartbeatMessage(this.getName(), "lastHeartbeat", product.getId()
- .toString());
- // track current memory usage
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage heapMemory = memoryMXBean.getHeapMemoryUsage();
- MemoryUsage nonHeapMemory = memoryMXBean.getNonHeapMemoryUsage();
- long heapUsed = heapMemory.getUsed();
- long nonHeapUsed = nonHeapMemory.getUsed();
- long heapCommitted = heapMemory.getCommitted();
- long nonHeapCommitted = nonHeapMemory.getCommitted();
- long heapMax = heapMemory.getMax();
- long nonHeapMax = nonHeapMemory.getMax();
- long totalUsed = heapUsed + nonHeapUsed;
- long totalCommitted = heapCommitted + nonHeapCommitted;
- long totalMax = heapMax + nonHeapMax;
- sendHeartbeatMessage(this.getName(), "totalUsed", Long
- .toString(totalUsed));
- sendHeartbeatMessage(this.getName(), "totalCommitted", Long
- .toString(totalCommitted));
- sendHeartbeatMessage(this.getName(), "totalMax", Long
- .toString(totalMax));
- // write heartbeat information to file
- this.writeHeartbeat();
- }
- /**
- * Send heartbeat data to heartbeat listener
- *
- * @param component String component
- * @param key Heartbeat key
- * @param value Heartbeat value
- */
- public static void sendHeartbeatMessage(final String component,
- final String key, final String value) {
- if (!LISTENING) {
- return;
- }
- HeartbeatStatus objHeartbeat;
- String heartbeatKey = component;
- if (heartbeatKey == null) {
- heartbeatKey = "<null>";
- }
- // register this heartbeat in temporary storage
- if (HASH_HEARTBEATS.containsKey(heartbeatKey)) {
- objHeartbeat = HASH_HEARTBEATS.get(heartbeatKey);
- } else {
- objHeartbeat = new HeartbeatStatus();
- HASH_HEARTBEATS.put(heartbeatKey, objHeartbeat);
- }
- // store the heartbeat key/value in temporary storage
- objHeartbeat.updateStatus(key, value);
- }
- /**
- * Write heartbeat data for all components to the heartbeat file
- *
- * @return true
- * @throws IOException if IO error occurs
- */
- public boolean writeHeartbeat() throws IOException {
- String tempFileName = heartbeatFile.getName() + "-temp";
- File tempFile = new File(tempFileName);
- gov.usgs.util.FileUtils.writeFileThenMove(tempFile, heartbeatFile, this
- .formatHeartbeatOutput().getBytes());
- return true;
- }
- /**
- * Self-configure HeartbeatListener object
- */
- @Override
- public void configure(Config config) throws Exception {
- // let default notification listener configure itself
- super.configure(config);
- heartbeatFile = new File(config.getProperty(
- HEARTBEAT_FILENAME_PROPERTY, DEFAULT_HEARTBEAT_FILENAME));
- LOGGER.config("[" + getName() + "] heartbeat file = "
- + heartbeatFile.getCanonicalPath());
- storageTimeout = Long.valueOf(config.getProperty(
- HEARTBEAT_TIMEOUT_PROPERTY, DEFAULT_STORAGE_TIMEOUT));
- LOGGER.config("[" + getName() + "] heartbeat timeout = "
- + storageTimeout + "ms");
- }
- /**
- * @return JSON-formatted output from the map of components and their values
- */
- public String formatHeartbeatOutput() {
- JsonObjectBuilder builder = Json.createObjectBuilder();
- for (String key : HASH_HEARTBEATS.keySet()) {
- HeartbeatStatus status = HASH_HEARTBEATS.get(key);
- builder.add(key, status == null ? null : status.toJsonObject());
- }
- return builder.build().toString();
- }
- /**
- * purge heartbeat key/values older than storageTimeout, also purging empty
- * heartbeats
- */
- @Override
- public void cleanup() throws Exception {
- super.cleanup();
- if (this.storageTimeout == 0) {
- return;
- }
- Map<String, HeartbeatStatus> hashHeartbeats = getHeartbeats();
- HeartbeatStatus objHeartbeat;
- String component;
- Iterator<String> itComponents = hashHeartbeats.keySet().iterator();
- Date purgeDate = new Date(new Date().getTime() - this.storageTimeout);
- // iterate through map of components
- while (itComponents.hasNext()) {
- component = itComponents.next();
- // purge old key/value entries within objHeartbeat
- objHeartbeat = hashHeartbeats.get(component);
- objHeartbeat.clearDataOlderThanDate(purgeDate);
- // if purge has left a objHeartheat with no remaining key/value
- // entries,
- // remove the objHeartbeat (i.e. the component)
- if (objHeartbeat.isEmpty()) {
- itComponents.remove();
- }
- } // END while more components
- }
- /** @return heartbeatFile */
- public File getHeartbeatFile() {
- return heartbeatFile;
- }
- /** @param heartbeatFile to set */
- public void setHeartbeatFile(File heartbeatFile) {
- this.heartbeatFile = heartbeatFile;
- }
- /** @return storageTimeout */
- public long getStorageTimeout() {
- return storageTimeout;
- }
- /** @param storageTimeout to set */
- public void setStorageTimeout(long storageTimeout) {
- this.storageTimeout = storageTimeout;
- }
- }