SocketProductSender.java
/*
* SocketProductSender
*/
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.io.BinaryIO;
import gov.usgs.earthquake.product.io.BinaryProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.XmlProductHandler;
import gov.usgs.earthquake.util.TimeoutOutputStream;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.logging.Logger;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
/**
* Send Products to SocketProductReceivers.
*
* The SocketProductSender implements the Configurable interface and uses the
* following configuration parameters:
*
* <dl>
* <dt>host</dt>
* <dd>(Required) The IP address or hostname of a SocketProductReceiver.</dd>
*
* <dt>port</dt>
* <dd>(Optional, default=11235) The port on host of a SocketProductReceiver</dd>
* </dl>
*
* @author jmfee
*
*/
public class SocketProductSender extends DefaultConfigurable implements
ProductSender {
/** Logging object. */
private static final Logger LOGGER = Logger
.getLogger(SocketProductSender.class.getName());
/** property for sender host */
public static final String SENDER_HOST_PROPERTY = "host";
/** property for sender port */
public static final String SENDER_PORT_PROPERTY = "port";
/** The default port number for SocketProductReceivers. */
public static final String DEFAULT_SENDER_PORT = "11235";
/** property for connectTimeout */
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
/** Default connection timeout */
public static final String DEFAULT_CONNECT_TIMEOUT = "15000";
/** property for readTimeout */
public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
/** Default read timeout */
public static final String DEFAULT_READ_TIMEOUT = "15000";
/** property for writeTimeout */
public static final String WRITE_TIMEOUT_PROPERTY = "writeTimeout";
/** Default write timeout */
public static final String DEFAULT_WRITE_TIMEOUT = "-1";
/** Property name to configure binary or xml format. */
public static final String BINARY_FORMAT_PROPERTY = "binaryFormat";
/** Default value for whether to use binary format. */
public static final String BINARY_FORMAT_DEFAULT = "false";
/** Property name to configure deflate compression. */
public static final String ENABLE_DEFLATE_PROPERTY = "enableDeflate";
/** Default value for whether to use deflate compression. */
public static final String ENABLE_DEFLATE_DEFAULT = "true";
/** property for deflateLevel */
public static final String DEFLATE_LEVEL_PROPERTY = "deflateLevel";
/** Default deflate level */
public static final String DEFLATE_LEVEL_DEFAULT = "1";
/** Property to enablePdlProtocol */
public static final String ENABLE_PDL_PROTOCOL_PROPERTY = "enablePdlProtocol";
/** Default for enable pdl protocol */
public static final String DEFAULT_ENABLE_PDL_PROTOCOL = "true";
/** Byte array for protocl header */
public static final byte[] PROTOCOL_HEADER = { 'P', 'D', 'L' };
/** Static var for v0.1 protocol */
public static final String PROTOCOL_VERSION_0_1 = "v0.1";
/** Static var for unknown product */
public static final String UNKNOWN_PRODUCT = "Unknown product";
/** Static var for alreadying having the product */
public static final String ALREADY_HAVE_PRODUCT = "Already have product";
/** Static var for a receive error */
public static final String RECEIVE_ERROR = "Error receiving product";
/** Whether to store in binary format (true), or xml format (false). */
private boolean binaryFormat = false;
/** Whether to deflate product sent over the wire. */
private boolean enableDeflate = true;
/** Compression level when deflating products. */
private int deflateLevel = 1;
private boolean enablePdlProtocol = true;
/** The remote hostname or ip address. */
private String host = null;
/** The remote port. */
private int port = -1; // -1 is invalid. This better be overridden.
/** How long to wait before connecting, in milliseconds. */
private int connectTimeout = 15000;
/** How long to block while reading, before timing out. */
private int readTimeout = 15000;
/** How long to block while writing, before timing out. */
private int writeTimeout = -1;
private Socket socket = null;
/**
* Construct a new ProductSender with default connection timeout.
*
* @param host Host of product sender
* @param port Port of product sender
*/
public SocketProductSender(final String host, final int port) {
this(host, port, Integer.parseInt(DEFAULT_CONNECT_TIMEOUT));
}
/**
* Construct a new ProductSender with default read and write timeouts
* @param host Host of product sender
* @param port Port of product sender
* @param connectTimeout Timeout in ms
*/
public SocketProductSender(final String host, final int port,
final int connectTimeout) {
this(host, port, connectTimeout,
Integer.parseInt(DEFAULT_READ_TIMEOUT), Integer
.parseInt(DEFAULT_WRITE_TIMEOUT));
}
/**
*
* Construct a new ProductSender
* @param host Host of product sender
* @param port Port of product sender
* @param connectTimeout connect timeout in ms
* @param readTimeout read timeout in ms
* @param writeTimeout write timeout in ms
*/
public SocketProductSender(final String host, final int port,
final int connectTimeout, final int readTimeout,
final int writeTimeout) {
this.host = host;
this.port = port;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
/** Empty constructor for configurable interface. */
public SocketProductSender() {
}
/**
* Construct a new ProductSender using a Config object.
*
* @param config Config object
* @throws Exception if error occurs
*/
public SocketProductSender(Config config) throws Exception {
configure(config);
}
/**
* Implement the ProductSender interface.
*
* Connects to host:port and sends a Deflaterped xml encoded Product. There
* is no direct response over the socket at this time.
*
* Updates may be retrieved from a ProductTracker.
*/
public void sendProduct(Product product) throws Exception {
BinaryIO io = new BinaryIO();
boolean sendProduct = true;
String status = null;
ObjectProductSource productSource = null;
InputStream in = null;
OutputStream out = null;
try {
socket = new Socket();
socket.setSoTimeout(readTimeout);
socket.connect(new InetSocketAddress(host, port), connectTimeout);
LOGGER.info("[" + getName() + "] sending product to "
+ socket.toString());
productSource = new ObjectProductSource(product);
in = new BufferedInputStream(socket.getInputStream());
out = new BufferedOutputStream(socket.getOutputStream());
if (writeTimeout > 0) {
out = new TimeoutOutputStream(out, writeTimeout);
}
if (enablePdlProtocol) {
LOGGER.fine("[" + getName() + "] using protocol version "
+ PROTOCOL_VERSION_0_1);
// flag to receiver for "PDL" protocol
out.write(PROTOCOL_HEADER);
io.writeString(PROTOCOL_VERSION_0_1, out);
io.writeString(product.getId().toString(), out);
out.flush();
status = io.readString(in);
if (ALREADY_HAVE_PRODUCT.equals(status)) {
sendProduct = false;
} else if (UNKNOWN_PRODUCT.equals(status)) {
// hub doesn't have this product, send
} else {
// unexpected reply, don't consider it success
throw new Exception("Unexpected hub reply '" + status + "'");
}
} else {
LOGGER.fine("[" + getName() + "] not using PDL protocol");
}
if (sendProduct) {
if (enableDeflate) {
out = new DeflaterOutputStream(out, new Deflater(
deflateLevel));
}
// make sure product handler doesn't close stream before done
OutputStream productOut = new StreamUtils.UnclosableOutputStream(
out);
if (binaryFormat) {
productSource
.streamTo(new BinaryProductHandler(productOut));
} else {
productSource.streamTo(new XmlProductHandler(productOut));
}
// deflate requires "finish"
if (enableDeflate) {
((DeflaterOutputStream) out).finish();
}
// flush buffered output stream to socket
out.flush();
// mark end of stream for server (for xml parser)
socket.shutdownOutput();
// finished sending, now get status from server
if (enablePdlProtocol) {
// the new way
status = io.readString(in);
} else {
// the old way
status = new BufferedReader(new InputStreamReader(
socket.getInputStream())).readLine();
}
}
LOGGER.info("[" + getName() + "] send complete "
+ socket.toString() + " response=\"" + status + "\"");
} catch (SocketTimeoutException ste) {
throw new Exception("Error sending to " + host
+ ", connect or read timeout", ste);
} catch (UnknownHostException uhe) {
throw new Exception("Unknown host " + host
+ ", check that DNS is properly configured", uhe);
} catch (SocketException se) {
if (!enablePdlProtocol) {
// check the old way
try {
// possible that hub already has product
status = new BufferedReader(new InputStreamReader(
socket.getInputStream())).readLine();
if (status.equals("Product already received")) {
// hub already has product
LOGGER.info("[" + getName()
+ "] hub already has product");
return;
}
} catch (Exception e) {
// ignore, already have an exception
e.printStackTrace();
}
}
throw new Exception("Error sending to " + host
+ ", possible write timeout", se);
} catch (Exception e) {
throw new Exception("[" + getName() + "] error sending to " + host,
e);
} finally {
try {
out.close();
} catch (Exception ignore) {
}
socket.close();
socket = null;
}
if (status != null && status.startsWith("Error")) {
throw new Exception("[" + getName() + "] error sending to " + host
+ ", message=" + status);
}
}
/**
* Reads the host and port from config.
*
* @param config
* a Config object with host and port properties.
*/
public void configure(Config config) throws Exception {
host = config.getProperty(SENDER_HOST_PROPERTY);
if (host == null) {
throw new ConfigurationException("[" + getName()
+ "] 'host' is a required configuration property");
}
LOGGER.config("[" + getName() + "] host is '" + host + "'");
port = Integer.parseInt(config.getProperty(SENDER_PORT_PROPERTY,
DEFAULT_SENDER_PORT).trim());
LOGGER.config("[" + getName() + "] port is '" + port + "'");
connectTimeout = Integer.parseInt(config.getProperty(
CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
LOGGER.config("[" + getName() + "] connectTimeout is '"
+ connectTimeout + "'");
readTimeout = Integer.parseInt(config.getProperty(
READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
LOGGER.config("[" + getName() + "] readTimeout is '" + readTimeout
+ "'");
writeTimeout = Integer.parseInt(config.getProperty(
WRITE_TIMEOUT_PROPERTY, DEFAULT_WRITE_TIMEOUT));
LOGGER.config("[" + getName() + "] writeTimeout is '" + writeTimeout
+ "'");
binaryFormat = Boolean.valueOf(config.getProperty(
BINARY_FORMAT_PROPERTY, BINARY_FORMAT_DEFAULT));
LOGGER.config("[" + getName() + "] using "
+ (binaryFormat ? "binary" : "xml") + " format");
enableDeflate = Boolean.valueOf(config.getProperty(
ENABLE_DEFLATE_PROPERTY, ENABLE_DEFLATE_DEFAULT));
LOGGER.config("[" + getName() + "] enableDeflate is " + enableDeflate);
deflateLevel = Integer.valueOf(config.getProperty(
DEFLATE_LEVEL_PROPERTY, DEFLATE_LEVEL_DEFAULT));
LOGGER.config("[" + getName() + "] deflateLevel is " + deflateLevel);
enablePdlProtocol = Boolean.valueOf(config.getProperty(
ENABLE_PDL_PROTOCOL_PROPERTY, DEFAULT_ENABLE_PDL_PROTOCOL));
LOGGER.config("[" + getName() + "] enablePdlProtocol is "
+ enablePdlProtocol);
}
/**
* Makes sure the socket is closed.
*/
public void shutdown() throws Exception {
if (socket != null) {
if (!socket.isOutputShutdown()) {
try {
socket.getOutputStream().flush();
socket.getOutputStream().close();
} catch (IOException iox) { /* Ignore */
}
}
if (!socket.isClosed()) {
try {
socket.close();
} catch (IOException iox) { /* Ignore */
}
}
}
}
/**
* Does nothing, a socket is opened each time a product is sent.
*/
public void startup() throws Exception {
// Nothing to do for startup...
}
/**
* @return the binaryFormat
*/
public boolean isBinaryFormat() {
return binaryFormat;
}
/**
* @param binaryFormat
* the binaryFormat to set
*/
public void setBinaryFormat(boolean binaryFormat) {
this.binaryFormat = binaryFormat;
}
/**
* @return the enableDeflate
*/
public boolean isEnableDeflate() {
return enableDeflate;
}
/**
* @param enableDeflate
* the enableDeflate to set
*/
public void setEnableDeflate(boolean enableDeflate) {
this.enableDeflate = enableDeflate;
}
/**
* @return the deflateLevel
*/
public int getDeflateLevel() {
return deflateLevel;
}
/**
* @param deflateLevel
* the deflateLevel to set
*/
public void setDeflateLevel(int deflateLevel) {
this.deflateLevel = deflateLevel;
}
/**
* @return the enablePdlProtocol
*/
public boolean isEnablePdlProtocol() {
return enablePdlProtocol;
}
/**
* @param enablePdlProtocol
* the enablePdlProtocol to set
*/
public void setEnablePdlProtocol(boolean enablePdlProtocol) {
this.enablePdlProtocol = enablePdlProtocol;
}
/**
* @return the connectTimeout
*/
public int getConnectTimeout() {
return connectTimeout;
}
/**
* @param connectTimeout
* the connectTimeout to set
*/
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
/**
* @return the readTimeout
*/
public int getReadTimeout() {
return readTimeout;
}
/**
* @param readTimeout
* the readTimeout to set
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
/**
* @return the writeTimeout
*/
public int getWriteTimeout() {
return writeTimeout;
}
/**
* @param writeTimeout
* the writeTimeout to set
*/
public void setWriteTimeout(int writeTimeout) {
this.writeTimeout = writeTimeout;
}
/**
* @return the host
*/
public String getHost() {
return host;
}
/**
* @param host
* the host to set
*/
public void setHost(String host) {
this.host = host;
}
/**
* @return the port
*/
public int getPort() {
return port;
}
/**
* @param port
* the port to set
*/
public void setPort(int port) {
this.port = port;
}
}