SocketProductSender.java

/*
 * SocketProductSender
 */
package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.io.BinaryIO;
import gov.usgs.earthquake.product.io.BinaryProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.XmlProductHandler;
import gov.usgs.earthquake.util.TimeoutOutputStream;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.logging.Logger;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;

/**
 * Send Products to SocketProductReceivers.
 *
 * The SocketProductSender implements the Configurable interface and uses the
 * following configuration parameters:
 *
 * <dl>
 * <dt>host</dt>
 * <dd>(Required) The IP address or hostname of a SocketProductReceiver.</dd>
 *
 * <dt>port</dt>
 * <dd>(Optional, default=11235) The port on host of a SocketProductReceiver</dd>
 * </dl>
 *
 * @author jmfee
 *
 */
public class SocketProductSender extends DefaultConfigurable implements
		ProductSender {

	/** Logging object. */
	private static final Logger LOGGER = Logger
			.getLogger(SocketProductSender.class.getName());

	/** property for sender host */
	public static final String SENDER_HOST_PROPERTY = "host";
	/** property for sender port */
	public static final String SENDER_PORT_PROPERTY = "port";

	/** The default port number for SocketProductReceivers. */
	public static final String DEFAULT_SENDER_PORT = "11235";

	/** property for connectTimeout */
	public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
	/** Default connection timeout */
	public static final String DEFAULT_CONNECT_TIMEOUT = "15000";

	/** property for readTimeout */
	public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
	/** Default read timeout */
	public static final String DEFAULT_READ_TIMEOUT = "15000";

	/** property for writeTimeout */
	public static final String WRITE_TIMEOUT_PROPERTY = "writeTimeout";
	/** Default write timeout */
	public static final String DEFAULT_WRITE_TIMEOUT = "-1";

	/** Property name to configure binary or xml format. */
	public static final String BINARY_FORMAT_PROPERTY = "binaryFormat";
	/** Default value for whether to use binary format. */
	public static final String BINARY_FORMAT_DEFAULT = "false";

	/** Property name to configure deflate compression. */
	public static final String ENABLE_DEFLATE_PROPERTY = "enableDeflate";
	/** Default value for whether to use deflate compression. */
	public static final String ENABLE_DEFLATE_DEFAULT = "true";

	/** property for deflateLevel */
	public static final String DEFLATE_LEVEL_PROPERTY = "deflateLevel";
	/** Default deflate level */
	public static final String DEFLATE_LEVEL_DEFAULT = "1";

	/** Property to enablePdlProtocol */
	public static final String ENABLE_PDL_PROTOCOL_PROPERTY = "enablePdlProtocol";
	/** Default for enable pdl protocol */
	public static final String DEFAULT_ENABLE_PDL_PROTOCOL = "true";

	/** Byte array for protocl header */
	public static final byte[] PROTOCOL_HEADER = { 'P', 'D', 'L' };
	/** Static var for v0.1 protocol */
	public static final String PROTOCOL_VERSION_0_1 = "v0.1";
	/** Static var for unknown product */
	public static final String UNKNOWN_PRODUCT = "Unknown product";
	/** Static var for alreadying having the product */
	public static final String ALREADY_HAVE_PRODUCT = "Already have product";
	/** Static var for a receive error */
	public static final String RECEIVE_ERROR = "Error receiving product";

	/** Whether to store in binary format (true), or xml format (false). */
	private boolean binaryFormat = false;

	/** Whether to deflate product sent over the wire. */
	private boolean enableDeflate = true;

	/** Compression level when deflating products. */
	private int deflateLevel = 1;

	private boolean enablePdlProtocol = true;

	/** The remote hostname or ip address. */
	private String host = null;
	/** The remote port. */
	private int port = -1; // -1 is invalid. This better be overridden.
	/** How long to wait before connecting, in milliseconds. */
	private int connectTimeout = 15000;
	/** How long to block while reading, before timing out. */
	private int readTimeout = 15000;
	/** How long to block while writing, before timing out. */
	private int writeTimeout = -1;

	private Socket socket = null;

	/**
	 * Construct a new ProductSender with default connection timeout.
	 *
	 * @param host Host of product sender
	 * @param port Port of product sender
	 */
	public SocketProductSender(final String host, final int port) {
		this(host, port, Integer.parseInt(DEFAULT_CONNECT_TIMEOUT));
	}

	/**
	 * Construct a new ProductSender with default read and write timeouts
	 * @param host Host of product sender
	 * @param port Port of product sender
	 * @param connectTimeout Timeout in ms
	 */
	public SocketProductSender(final String host, final int port,
			final int connectTimeout) {
		this(host, port, connectTimeout,
				Integer.parseInt(DEFAULT_READ_TIMEOUT), Integer
						.parseInt(DEFAULT_WRITE_TIMEOUT));
	}

	/**
	 *
	 * Construct a new ProductSender
	 * @param host Host of product sender
	 * @param port Port of product sender
	 * @param connectTimeout connect timeout in ms
	 * @param readTimeout read timeout in ms
	 * @param writeTimeout write timeout in ms
	 */
	public SocketProductSender(final String host, final int port,
			final int connectTimeout, final int readTimeout,
			final int writeTimeout) {
		this.host = host;
		this.port = port;
		this.connectTimeout = connectTimeout;
		this.readTimeout = readTimeout;
		this.writeTimeout = writeTimeout;
	}

	/** Empty constructor for configurable interface. */
	public SocketProductSender() {
	}

	/**
	 * Construct a new ProductSender using a Config object.
	 *
	 * @param config Config object
	 * @throws Exception if error occurs
	 */
	public SocketProductSender(Config config) throws Exception {
		configure(config);
	}

	/**
	 * Implement the ProductSender interface.
	 *
	 * Connects to host:port and sends a Deflaterped xml encoded Product. There
	 * is no direct response over the socket at this time.
	 *
	 * Updates may be retrieved from a ProductTracker.
	 */
	public void sendProduct(Product product) throws Exception {
		BinaryIO io = new BinaryIO();
		boolean sendProduct = true;
		String status = null;
		ObjectProductSource productSource = null;
		InputStream in = null;
		OutputStream out = null;
		try {
			socket = new Socket();
			socket.setSoTimeout(readTimeout);
			socket.connect(new InetSocketAddress(host, port), connectTimeout);
			LOGGER.info("[" + getName() + "] sending product to "
					+ socket.toString());

			productSource = new ObjectProductSource(product);

			in = new BufferedInputStream(socket.getInputStream());
			out = new BufferedOutputStream(socket.getOutputStream());
			if (writeTimeout > 0) {
				out = new TimeoutOutputStream(out, writeTimeout);
			}

			if (enablePdlProtocol) {
				LOGGER.fine("[" + getName() + "] using protocol version "
						+ PROTOCOL_VERSION_0_1);

				// flag to receiver for "PDL" protocol
				out.write(PROTOCOL_HEADER);
				io.writeString(PROTOCOL_VERSION_0_1, out);
				io.writeString(product.getId().toString(), out);
				out.flush();

				status = io.readString(in);
				if (ALREADY_HAVE_PRODUCT.equals(status)) {
					sendProduct = false;
				} else if (UNKNOWN_PRODUCT.equals(status)) {
					// hub doesn't have this product, send
				} else {
					// unexpected reply, don't consider it success
					throw new Exception("Unexpected hub reply '" + status + "'");
				}
			} else {
				LOGGER.fine("[" + getName() + "] not using PDL protocol");
			}

			if (sendProduct) {
				if (enableDeflate) {
					out = new DeflaterOutputStream(out, new Deflater(
							deflateLevel));
				}

				// make sure product handler doesn't close stream before done
				OutputStream productOut = new StreamUtils.UnclosableOutputStream(
						out);
				if (binaryFormat) {
					productSource
							.streamTo(new BinaryProductHandler(productOut));
				} else {
					productSource.streamTo(new XmlProductHandler(productOut));
				}

				// deflate requires "finish"
				if (enableDeflate) {
					((DeflaterOutputStream) out).finish();
				}

				// flush buffered output stream to socket
				out.flush();
				// mark end of stream for server (for xml parser)
				socket.shutdownOutput();

				// finished sending, now get status from server
				if (enablePdlProtocol) {
					// the new way
					status = io.readString(in);
				} else {
					// the old way
					status = new BufferedReader(new InputStreamReader(
							socket.getInputStream())).readLine();
				}
			}

			LOGGER.info("[" + getName() + "] send complete "
					+ socket.toString() + " response=\"" + status + "\"");
		} catch (SocketTimeoutException ste) {
			throw new Exception("Error sending to " + host
					+ ", connect or read timeout", ste);
		} catch (UnknownHostException uhe) {
			throw new Exception("Unknown host " + host
					+ ", check that DNS is properly configured", uhe);
		} catch (SocketException se) {
			if (!enablePdlProtocol) {
				// check the old way
				try {
					// possible that hub already has product
					status = new BufferedReader(new InputStreamReader(
							socket.getInputStream())).readLine();
					if (status.equals("Product already received")) {
						// hub already has product
						LOGGER.info("[" + getName()
								+ "] hub already has product");
						return;
					}
				} catch (Exception e) {
					// ignore, already have an exception
					e.printStackTrace();
				}
			}
			throw new Exception("Error sending to " + host
					+ ", possible write timeout", se);
		} catch (Exception e) {
			throw new Exception("[" + getName() + "] error sending to " + host,
					e);
		} finally {
			try {
				out.close();
			} catch (Exception ignore) {
			}
			socket.close();
			socket = null;
		}

		if (status != null && status.startsWith("Error")) {
			throw new Exception("[" + getName() + "] error sending to " + host
					+ ", message=" + status);
		}
	}

	/**
	 * Reads the host and port from config.
	 *
	 * @param config
	 *            a Config object with host and port properties.
	 */
	public void configure(Config config) throws Exception {
		host = config.getProperty(SENDER_HOST_PROPERTY);
		if (host == null) {
			throw new ConfigurationException("[" + getName()
					+ "] 'host' is a required configuration property");
		}
		LOGGER.config("[" + getName() + "] host is '" + host + "'");

		port = Integer.parseInt(config.getProperty(SENDER_PORT_PROPERTY,
				DEFAULT_SENDER_PORT).trim());
		LOGGER.config("[" + getName() + "] port is '" + port + "'");

		connectTimeout = Integer.parseInt(config.getProperty(
				CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
		LOGGER.config("[" + getName() + "] connectTimeout is '"
				+ connectTimeout + "'");

		readTimeout = Integer.parseInt(config.getProperty(
				READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
		LOGGER.config("[" + getName() + "] readTimeout is '" + readTimeout
				+ "'");

		writeTimeout = Integer.parseInt(config.getProperty(
				WRITE_TIMEOUT_PROPERTY, DEFAULT_WRITE_TIMEOUT));
		LOGGER.config("[" + getName() + "] writeTimeout is '" + writeTimeout
				+ "'");

		binaryFormat = Boolean.valueOf(config.getProperty(
				BINARY_FORMAT_PROPERTY, BINARY_FORMAT_DEFAULT));
		LOGGER.config("[" + getName() + "] using "
				+ (binaryFormat ? "binary" : "xml") + " format");

		enableDeflate = Boolean.valueOf(config.getProperty(
				ENABLE_DEFLATE_PROPERTY, ENABLE_DEFLATE_DEFAULT));
		LOGGER.config("[" + getName() + "] enableDeflate is " + enableDeflate);

		deflateLevel = Integer.valueOf(config.getProperty(
				DEFLATE_LEVEL_PROPERTY, DEFLATE_LEVEL_DEFAULT));
		LOGGER.config("[" + getName() + "] deflateLevel is " + deflateLevel);

		enablePdlProtocol = Boolean.valueOf(config.getProperty(
				ENABLE_PDL_PROTOCOL_PROPERTY, DEFAULT_ENABLE_PDL_PROTOCOL));
		LOGGER.config("[" + getName() + "] enablePdlProtocol is "
				+ enablePdlProtocol);
	}

	/**
	 * Makes sure the socket is closed.
	 */
	public void shutdown() throws Exception {
		if (socket != null) {
			if (!socket.isOutputShutdown()) {
				try {
					socket.getOutputStream().flush();
					socket.getOutputStream().close();
				} catch (IOException iox) { /* Ignore */
				}
			}
			if (!socket.isClosed()) {
				try {
					socket.close();
				} catch (IOException iox) { /* Ignore */
				}
			}
		}
	}

	/**
	 * Does nothing, a socket is opened each time a product is sent.
	 */
	public void startup() throws Exception {
		// Nothing to do for startup...
	}

	/**
	 * @return the binaryFormat
	 */
	public boolean isBinaryFormat() {
		return binaryFormat;
	}

	/**
	 * @param binaryFormat
	 *            the binaryFormat to set
	 */
	public void setBinaryFormat(boolean binaryFormat) {
		this.binaryFormat = binaryFormat;
	}

	/**
	 * @return the enableDeflate
	 */
	public boolean isEnableDeflate() {
		return enableDeflate;
	}

	/**
	 * @param enableDeflate
	 *            the enableDeflate to set
	 */
	public void setEnableDeflate(boolean enableDeflate) {
		this.enableDeflate = enableDeflate;
	}

	/**
	 * @return the deflateLevel
	 */
	public int getDeflateLevel() {
		return deflateLevel;
	}

	/**
	 * @param deflateLevel
	 *            the deflateLevel to set
	 */
	public void setDeflateLevel(int deflateLevel) {
		this.deflateLevel = deflateLevel;
	}

	/**
	 * @return the enablePdlProtocol
	 */
	public boolean isEnablePdlProtocol() {
		return enablePdlProtocol;
	}

	/**
	 * @param enablePdlProtocol
	 *            the enablePdlProtocol to set
	 */
	public void setEnablePdlProtocol(boolean enablePdlProtocol) {
		this.enablePdlProtocol = enablePdlProtocol;
	}

	/**
	 * @return the connectTimeout
	 */
	public int getConnectTimeout() {
		return connectTimeout;
	}

	/**
	 * @param connectTimeout
	 *            the connectTimeout to set
	 */
	public void setConnectTimeout(int connectTimeout) {
		this.connectTimeout = connectTimeout;
	}

	/**
	 * @return the readTimeout
	 */
	public int getReadTimeout() {
		return readTimeout;
	}

	/**
	 * @param readTimeout
	 *            the readTimeout to set
	 */
	public void setReadTimeout(int readTimeout) {
		this.readTimeout = readTimeout;
	}

	/**
	 * @return the writeTimeout
	 */
	public int getWriteTimeout() {
		return writeTimeout;
	}

	/**
	 * @param writeTimeout
	 *            the writeTimeout to set
	 */
	public void setWriteTimeout(int writeTimeout) {
		this.writeTimeout = writeTimeout;
	}

	/**
	 * @return the host
	 */
	public String getHost() {
		return host;
	}

	/**
	 * @param host
	 *            the host to set
	 */
	public void setHost(String host) {
		this.host = host;
	}

	/**
	 * @return the port
	 */
	public int getPort() {
		return port;
	}

	/**
	 * @param port
	 *            the port to set
	 */
	public void setPort(int port) {
		this.port = port;
	}

}