EmbeddedPDLClient.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.cube.CubeMessage;
import gov.usgs.earthquake.eids.LegacyConverter;
import gov.usgs.earthquake.eidsutil.EIDSClient;
import gov.usgs.earthquake.product.Product;

import java.io.File;

/**
 * An example of an embedded PDL client.
 *
 * Creates a notification receiver, which store it's information in a specified
 * directory. Listeners can be added to this receiver before its startup()
 * method is called, which starts the distribution process.
 */
public class EmbeddedPDLClient {

	/** name for embedded receiver, appears in log files. */
	public static final String EMBEDDED_NAME = "embeddedPDL";
	/** name for eids tracking file, in data directory. */
	public static final String EMBEDDED_TRACKING_FILE = "receiver_tracking.dat";
	/** name for notification index file, in data directory. */
	public static final String EMBEDDED_INDEX_FILE = "receiver_index.db";
	/** name for receiver storage directory, in data directory. */
	public static final String EMBEDDED_STORAGE_DIRECTORY = "receiver_storage";

	/** The notification receiver. */
	private EIDSNotificationReceiver eidsReceiver;

	/**
	 * Construct an embedded PDL client.
	 *
	 * @param dataDirectory
	 *            directory where receiver files are stored.
	 * @param serverHost
	 *            PDL hub hostname.
	 * @param serverPort
	 *            PDL hub port.
	 * @param alternateServersList
	 *            comma separated list of "hostname:port" alternate pdl hubs.
	 * @throws Exception if error occurs
	 */
	public EmbeddedPDLClient(final File dataDirectory, final String serverHost,
			final Integer serverPort, final String alternateServersList)
			throws Exception {
		EIDSClient client = new EIDSClient();
		client.setServerHost(serverHost);
		client.setServerPort(serverPort);
		client.setAlternateServersList(alternateServersList);
		client.setTrackingFileName(new File(dataDirectory,
				EMBEDDED_TRACKING_FILE).getCanonicalPath());


		eidsReceiver = new EIDSNotificationReceiver();
		eidsReceiver.setName(EMBEDDED_NAME);
		eidsReceiver.setNotificationIndex(new JDBCNotificationIndex(new File(
				dataDirectory, EMBEDDED_INDEX_FILE).getCanonicalPath()));
		eidsReceiver.setProductStorage(new FileProductStorage(new File(
				dataDirectory, EMBEDDED_STORAGE_DIRECTORY)));
		// default these to 15 minutes
		eidsReceiver.setProductStorageMaxAge(900000L);
		eidsReceiver.setReceiverCleanupInterval(900000L);
		eidsReceiver.setClient(client);
		client.addListener(eidsReceiver);
	}

	/**
	 * Get the embedded EIDSNotificationReceiver object for further
	 * configuration, adding/removing listeners, and starting/stopping
	 * distribution.
	 *
	 * @return the embedded EIDSNotificationReceiver object.
	 */
	public EIDSNotificationReceiver getReceiver() {
		return eidsReceiver;
	}

	/**
	 * Example main method that uses the EmbeddedPDLClient.
	 *
	 * @param args
	 *            not used.
	 * @throws Exception if error occurs
	 */
	public static void main(final String[] args) throws Exception {
		// disable product tracker messages
		ProductTracker.setTrackerEnabled(false);

		// client for production hub
		File dataDirectory = new File("embeddedStorage");
		String hostname = "prod01-pdl01.cr.usgs.gov";
		Integer port = 39977;
		String alternateServers = "prod02-pdl01.wr.usgs.gov:39977";

		// create embedded client
		final EmbeddedPDLClient client = new EmbeddedPDLClient(dataDirectory,
				hostname, port, alternateServers);

		// create a listener that tries to convert messages to cube
		final DefaultNotificationListener listener = new DefaultNotificationListener() {
			// convert a product to a cube message, if possible
			private final LegacyConverter converter = LegacyConverter
					.cubeConverter();

			@Override
			public void onProduct(final Product product) {
				System.err.println("Processing product "
						+ product.getId().toString());
				try {
					byte[] cubeBytes = converter.convert(product);
					if (cubeBytes != null) {
						CubeMessage cubeMessage = CubeMessage.parse(new String(
								cubeBytes));
						// CubeMessage instanceof CubeEvent
						// or CubeMessage instanceof CubeDelete
						System.err.println(cubeMessage.toCUBE());
					}
				} catch (Exception e) {
					// ignore
				}
			}
		};
		// only listen for origin messages
		listener.getIncludeTypes().add("origin");
		// name appears in log messages
		listener.setName("embeddedListener");
		// add listener index for more reliable notification across restarts
		listener.setNotificationIndex(new JDBCNotificationIndex(new File(
				dataDirectory, "embedded_listener_index.db").getCanonicalPath()));

		// add listener to receiver
		client.getReceiver().addNotificationListener(listener);

		// start
		listener.startup();
		client.getReceiver().startup();

		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				try {
					client.getReceiver().shutdown();
					listener.shutdown();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
	}
}