HeartbeatListener.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.Product;
import gov.usgs.util.Config;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.Iterator;
import java.util.logging.Logger;

import javax.json.Json;
import javax.json.JsonObjectBuilder;


/**
 * Heartbeat Listener stores heartbeat messages and writes them to a heartbeat
 * file when a product is received
 *
 * @author tene
 *
 */
public class HeartbeatListener extends DefaultNotificationListener {

	private static final Logger LOGGER = Logger
			.getLogger(HeartbeatListener.class.getName());

	/** Storage for heartbeat components */
	private static Map<String, HeartbeatStatus> HASH_HEARTBEATS = new ConcurrentHashMap<>();

	/** Configurable property for heartbeat fully qualified directory+filename. */
	public static final String HEARTBEAT_FILENAME_PROPERTY = "heartbeatFilename";

	/** Default heartbeat directory. */
	public static final String DEFAULT_HEARTBEAT_FILENAME = "heartbeat.dat";

	/** Default timeout for HeartbeatStatus key/value pairs. Zero = disabled */
	public static final String DEFAULT_STORAGE_TIMEOUT = "0";

	/**
	 * Default schedule interval for HeartbeatStatus key/value pairs cleanup. 30
	 * minutes
	 */
	public static final String DEFAULT_CLEANUP_INTERVAL = "1800000";

	/** Configurable property for heartbeat key/value expiration */
	public static final String HEARTBEAT_TIMEOUT_PROPERTY = "heartbeatTimeout";

	/** Flag listeners are listening */
	private static boolean LISTENING = false;

	/** Hearbeat registered file. */
	private File heartbeatFile;

	/** Timeout for expiration of key/value pairs */
	private long storageTimeout;

	/**
	 * Create a new HeartbeatListener.
	 *
	 * Sets up the includeTypes list to contain "heartbeat".
	 * @throws Exception if error occurs
	 */
	public HeartbeatListener() throws Exception {
		LISTENING = true;
		heartbeatFile = new File(DEFAULT_HEARTBEAT_FILENAME);
		storageTimeout = Long.valueOf(DEFAULT_STORAGE_TIMEOUT);
		this.getIncludeTypes().add("heartbeat");
	}

	/**
	 * @return map of component and heartbeat status.
	 */
	protected static Map<String, HeartbeatStatus> getHeartbeats() {
		return HASH_HEARTBEATS;
	}

	/**
	 * heartbeat onProduct processing writes to heartbeat file
	 */
	@Override
	public void onProduct(final Product product) throws Exception {

		// track product
		sendHeartbeatMessage(this.getName(), "lastHeartbeat", product.getId()
				.toString());

		// track current memory usage
		MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
		MemoryUsage heapMemory = memoryMXBean.getHeapMemoryUsage();
		MemoryUsage nonHeapMemory = memoryMXBean.getNonHeapMemoryUsage();

		long heapUsed = heapMemory.getUsed();
		long nonHeapUsed = nonHeapMemory.getUsed();
		long heapCommitted = heapMemory.getCommitted();
		long nonHeapCommitted = nonHeapMemory.getCommitted();
		long heapMax = heapMemory.getMax();
		long nonHeapMax = nonHeapMemory.getMax();
		long totalUsed = heapUsed + nonHeapUsed;
		long totalCommitted = heapCommitted + nonHeapCommitted;
		long totalMax = heapMax + nonHeapMax;

		sendHeartbeatMessage(this.getName(), "totalUsed", Long
				.toString(totalUsed));
		sendHeartbeatMessage(this.getName(), "totalCommitted", Long
				.toString(totalCommitted));
		sendHeartbeatMessage(this.getName(), "totalMax", Long
				.toString(totalMax));

		// write heartbeat information to file
		this.writeHeartbeat();

	}

	/**
	 * Send heartbeat data to heartbeat listener
	 *
	 * @param component String component
	 * @param key Heartbeat key
	 * @param value Heartbeat value
	 */
	public static void sendHeartbeatMessage(final String component,
			final String key, final String value) {

		if (!LISTENING) {
			return;
		}
		HeartbeatStatus objHeartbeat;

		String heartbeatKey = component;
		if (heartbeatKey == null) {
			heartbeatKey = "<null>";
		}

		// register this heartbeat in temporary storage
		if (HASH_HEARTBEATS.containsKey(heartbeatKey)) {
			objHeartbeat = HASH_HEARTBEATS.get(heartbeatKey);
		} else {
			objHeartbeat = new HeartbeatStatus();
			HASH_HEARTBEATS.put(heartbeatKey, objHeartbeat);
		}

		// store the heartbeat key/value in temporary storage
		objHeartbeat.updateStatus(key, value);
	}

	/**
	 * Write heartbeat data for all components to the heartbeat file
	 *
	 * @return true
	 * @throws IOException if IO error occurs
	 */
	public boolean writeHeartbeat() throws IOException {
		String tempFileName = heartbeatFile.getName() + "-temp";
		File tempFile = new File(tempFileName);

		gov.usgs.util.FileUtils.writeFileThenMove(tempFile, heartbeatFile, this
				.formatHeartbeatOutput().getBytes());

		return true;
	}

	/**
	 * Self-configure HeartbeatListener object
	 */
	@Override
	public void configure(Config config) throws Exception {
		// let default notification listener configure itself
		super.configure(config);

		heartbeatFile = new File(config.getProperty(
				HEARTBEAT_FILENAME_PROPERTY, DEFAULT_HEARTBEAT_FILENAME));
		LOGGER.config("[" + getName() + "] heartbeat file = "
				+ heartbeatFile.getCanonicalPath());

		storageTimeout = Long.valueOf(config.getProperty(
				HEARTBEAT_TIMEOUT_PROPERTY, DEFAULT_STORAGE_TIMEOUT));
		LOGGER.config("[" + getName() + "] heartbeat timeout = "
				+ storageTimeout + "ms");
	}

	/**
	 * @return JSON-formatted output from the map of components and their values
	 */
	public String formatHeartbeatOutput() {
		JsonObjectBuilder builder = Json.createObjectBuilder();
		for (String key : HASH_HEARTBEATS.keySet()) {
			HeartbeatStatus status = HASH_HEARTBEATS.get(key);
			builder.add(key, status == null ? null : status.toJsonObject());
		}
		return builder.build().toString();
	}

	/**
	 * purge heartbeat key/values older than storageTimeout, also purging empty
	 * heartbeats
	 */
	@Override
	public void cleanup() throws Exception {
		super.cleanup();

		if (this.storageTimeout == 0) {
			return;
		}

		Map<String, HeartbeatStatus> hashHeartbeats = getHeartbeats();
		HeartbeatStatus objHeartbeat;
		String component;
		Iterator<String> itComponents = hashHeartbeats.keySet().iterator();
		Date purgeDate = new Date(new Date().getTime() - this.storageTimeout);

		// iterate through map of components
		while (itComponents.hasNext()) {
			component = itComponents.next();

			// purge old key/value entries within objHeartbeat
			objHeartbeat = hashHeartbeats.get(component);
			objHeartbeat.clearDataOlderThanDate(purgeDate);

			// if purge has left a objHeartheat with no remaining key/value
			// entries,
			// remove the objHeartbeat (i.e. the component)
			if (objHeartbeat.isEmpty()) {
				itComponents.remove();
			}

		} // END while more components

	}

	/** @return heartbeatFile */
	public File getHeartbeatFile() {
		return heartbeatFile;
	}

	/** @param heartbeatFile to set */
	public void setHeartbeatFile(File heartbeatFile) {
		this.heartbeatFile = heartbeatFile;
	}

	/** @return storageTimeout */
	public long getStorageTimeout() {
		return storageTimeout;
	}

	/** @param storageTimeout to set */
	public void setStorageTimeout(long storageTimeout) {
		this.storageTimeout = storageTimeout;
	}

}