SocketProductSender.java

  1. /*
  2.  * SocketProductSender
  3.  */
  4. package gov.usgs.earthquake.distribution;

  5. import gov.usgs.earthquake.product.Product;
  6. import gov.usgs.earthquake.product.io.BinaryIO;
  7. import gov.usgs.earthquake.product.io.BinaryProductHandler;
  8. import gov.usgs.earthquake.product.io.ObjectProductSource;
  9. import gov.usgs.earthquake.product.io.XmlProductHandler;
  10. import gov.usgs.earthquake.util.TimeoutOutputStream;
  11. import gov.usgs.util.Config;
  12. import gov.usgs.util.DefaultConfigurable;
  13. import gov.usgs.util.StreamUtils;

  14. import java.io.BufferedInputStream;
  15. import java.io.BufferedOutputStream;
  16. import java.io.BufferedReader;
  17. import java.io.IOException;
  18. import java.io.InputStream;
  19. import java.io.InputStreamReader;
  20. import java.io.OutputStream;
  21. import java.net.InetSocketAddress;
  22. import java.net.Socket;
  23. import java.net.SocketException;
  24. import java.net.SocketTimeoutException;
  25. import java.net.UnknownHostException;
  26. import java.util.logging.Logger;
  27. import java.util.zip.Deflater;
  28. import java.util.zip.DeflaterOutputStream;

  29. /**
  30.  * Send Products to SocketProductReceivers.
  31.  *
  32.  * The SocketProductSender implements the Configurable interface and uses the
  33.  * following configuration parameters:
  34.  *
  35.  * <dl>
  36.  * <dt>host</dt>
  37.  * <dd>(Required) The IP address or hostname of a SocketProductReceiver.</dd>
  38.  *
  39.  * <dt>port</dt>
  40.  * <dd>(Optional, default=11235) The port on host of a SocketProductReceiver</dd>
  41.  * </dl>
  42.  *
  43.  * @author jmfee
  44.  *
  45.  */
  46. public class SocketProductSender extends DefaultConfigurable implements
  47.         ProductSender {

  48.     /** Logging object. */
  49.     private static final Logger LOGGER = Logger
  50.             .getLogger(SocketProductSender.class.getName());

  51.     /** property for sender host */
  52.     public static final String SENDER_HOST_PROPERTY = "host";
  53.     /** property for sender port */
  54.     public static final String SENDER_PORT_PROPERTY = "port";

  55.     /** The default port number for SocketProductReceivers. */
  56.     public static final String DEFAULT_SENDER_PORT = "11235";

  57.     /** property for connectTimeout */
  58.     public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  59.     /** Default connection timeout */
  60.     public static final String DEFAULT_CONNECT_TIMEOUT = "15000";

  61.     /** property for readTimeout */
  62.     public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
  63.     /** Default read timeout */
  64.     public static final String DEFAULT_READ_TIMEOUT = "15000";

  65.     /** property for writeTimeout */
  66.     public static final String WRITE_TIMEOUT_PROPERTY = "writeTimeout";
  67.     /** Default write timeout */
  68.     public static final String DEFAULT_WRITE_TIMEOUT = "-1";

  69.     /** Property name to configure binary or xml format. */
  70.     public static final String BINARY_FORMAT_PROPERTY = "binaryFormat";
  71.     /** Default value for whether to use binary format. */
  72.     public static final String BINARY_FORMAT_DEFAULT = "false";

  73.     /** Property name to configure deflate compression. */
  74.     public static final String ENABLE_DEFLATE_PROPERTY = "enableDeflate";
  75.     /** Default value for whether to use deflate compression. */
  76.     public static final String ENABLE_DEFLATE_DEFAULT = "true";

  77.     /** property for deflateLevel */
  78.     public static final String DEFLATE_LEVEL_PROPERTY = "deflateLevel";
  79.     /** Default deflate level */
  80.     public static final String DEFLATE_LEVEL_DEFAULT = "1";

  81.     /** Property to enablePdlProtocol */
  82.     public static final String ENABLE_PDL_PROTOCOL_PROPERTY = "enablePdlProtocol";
  83.     /** Default for enable pdl protocol */
  84.     public static final String DEFAULT_ENABLE_PDL_PROTOCOL = "true";

  85.     /** Byte array for protocl header */
  86.     public static final byte[] PROTOCOL_HEADER = { 'P', 'D', 'L' };
  87.     /** Static var for v0.1 protocol */
  88.     public static final String PROTOCOL_VERSION_0_1 = "v0.1";
  89.     /** Static var for unknown product */
  90.     public static final String UNKNOWN_PRODUCT = "Unknown product";
  91.     /** Static var for alreadying having the product */
  92.     public static final String ALREADY_HAVE_PRODUCT = "Already have product";
  93.     /** Static var for a receive error */
  94.     public static final String RECEIVE_ERROR = "Error receiving product";

  95.     /** Whether to store in binary format (true), or xml format (false). */
  96.     private boolean binaryFormat = false;

  97.     /** Whether to deflate product sent over the wire. */
  98.     private boolean enableDeflate = true;

  99.     /** Compression level when deflating products. */
  100.     private int deflateLevel = 1;

  101.     private boolean enablePdlProtocol = true;

  102.     /** The remote hostname or ip address. */
  103.     private String host = null;
  104.     /** The remote port. */
  105.     private int port = -1; // -1 is invalid. This better be overridden.
  106.     /** How long to wait before connecting, in milliseconds. */
  107.     private int connectTimeout = 15000;
  108.     /** How long to block while reading, before timing out. */
  109.     private int readTimeout = 15000;
  110.     /** How long to block while writing, before timing out. */
  111.     private int writeTimeout = -1;

  112.     private Socket socket = null;

  113.     /**
  114.      * Construct a new ProductSender with default connection timeout.
  115.      *
  116.      * @param host Host of product sender
  117.      * @param port Port of product sender
  118.      */
  119.     public SocketProductSender(final String host, final int port) {
  120.         this(host, port, Integer.parseInt(DEFAULT_CONNECT_TIMEOUT));
  121.     }

  122.     /**
  123.      * Construct a new ProductSender with default read and write timeouts
  124.      * @param host Host of product sender
  125.      * @param port Port of product sender
  126.      * @param connectTimeout Timeout in ms
  127.      */
  128.     public SocketProductSender(final String host, final int port,
  129.             final int connectTimeout) {
  130.         this(host, port, connectTimeout,
  131.                 Integer.parseInt(DEFAULT_READ_TIMEOUT), Integer
  132.                         .parseInt(DEFAULT_WRITE_TIMEOUT));
  133.     }

  134.     /**
  135.      *
  136.      * Construct a new ProductSender
  137.      * @param host Host of product sender
  138.      * @param port Port of product sender
  139.      * @param connectTimeout connect timeout in ms
  140.      * @param readTimeout read timeout in ms
  141.      * @param writeTimeout write timeout in ms
  142.      */
  143.     public SocketProductSender(final String host, final int port,
  144.             final int connectTimeout, final int readTimeout,
  145.             final int writeTimeout) {
  146.         this.host = host;
  147.         this.port = port;
  148.         this.connectTimeout = connectTimeout;
  149.         this.readTimeout = readTimeout;
  150.         this.writeTimeout = writeTimeout;
  151.     }

  152.     /** Empty constructor for configurable interface. */
  153.     public SocketProductSender() {
  154.     }

  155.     /**
  156.      * Construct a new ProductSender using a Config object.
  157.      *
  158.      * @param config Config object
  159.      * @throws Exception if error occurs
  160.      */
  161.     public SocketProductSender(Config config) throws Exception {
  162.         configure(config);
  163.     }

  164.     /**
  165.      * Implement the ProductSender interface.
  166.      *
  167.      * Connects to host:port and sends a Deflaterped xml encoded Product. There
  168.      * is no direct response over the socket at this time.
  169.      *
  170.      * Updates may be retrieved from a ProductTracker.
  171.      */
  172.     public void sendProduct(Product product) throws Exception {
  173.         BinaryIO io = new BinaryIO();
  174.         boolean sendProduct = true;
  175.         String status = null;
  176.         ObjectProductSource productSource = null;
  177.         InputStream in = null;
  178.         OutputStream out = null;
  179.         try {
  180.             socket = new Socket();
  181.             socket.setSoTimeout(readTimeout);
  182.             socket.connect(new InetSocketAddress(host, port), connectTimeout);
  183.             LOGGER.info("[" + getName() + "] sending product to "
  184.                     + socket.toString());

  185.             productSource = new ObjectProductSource(product);

  186.             in = new BufferedInputStream(socket.getInputStream());
  187.             out = new BufferedOutputStream(socket.getOutputStream());
  188.             if (writeTimeout > 0) {
  189.                 out = new TimeoutOutputStream(out, writeTimeout);
  190.             }

  191.             if (enablePdlProtocol) {
  192.                 LOGGER.fine("[" + getName() + "] using protocol version "
  193.                         + PROTOCOL_VERSION_0_1);

  194.                 // flag to receiver for "PDL" protocol
  195.                 out.write(PROTOCOL_HEADER);
  196.                 io.writeString(PROTOCOL_VERSION_0_1, out);
  197.                 io.writeString(product.getId().toString(), out);
  198.                 out.flush();

  199.                 status = io.readString(in);
  200.                 if (ALREADY_HAVE_PRODUCT.equals(status)) {
  201.                     sendProduct = false;
  202.                 } else if (UNKNOWN_PRODUCT.equals(status)) {
  203.                     // hub doesn't have this product, send
  204.                 } else {
  205.                     // unexpected reply, don't consider it success
  206.                     throw new Exception("Unexpected hub reply '" + status + "'");
  207.                 }
  208.             } else {
  209.                 LOGGER.fine("[" + getName() + "] not using PDL protocol");
  210.             }

  211.             if (sendProduct) {
  212.                 if (enableDeflate) {
  213.                     out = new DeflaterOutputStream(out, new Deflater(
  214.                             deflateLevel));
  215.                 }

  216.                 // make sure product handler doesn't close stream before done
  217.                 OutputStream productOut = new StreamUtils.UnclosableOutputStream(
  218.                         out);
  219.                 if (binaryFormat) {
  220.                     productSource
  221.                             .streamTo(new BinaryProductHandler(productOut));
  222.                 } else {
  223.                     productSource.streamTo(new XmlProductHandler(productOut));
  224.                 }

  225.                 // deflate requires "finish"
  226.                 if (enableDeflate) {
  227.                     ((DeflaterOutputStream) out).finish();
  228.                 }

  229.                 // flush buffered output stream to socket
  230.                 out.flush();
  231.                 // mark end of stream for server (for xml parser)
  232.                 socket.shutdownOutput();

  233.                 // finished sending, now get status from server
  234.                 if (enablePdlProtocol) {
  235.                     // the new way
  236.                     status = io.readString(in);
  237.                 } else {
  238.                     // the old way
  239.                     status = new BufferedReader(new InputStreamReader(
  240.                             socket.getInputStream())).readLine();
  241.                 }
  242.             }

  243.             LOGGER.info("[" + getName() + "] send complete "
  244.                     + socket.toString() + " response=\"" + status + "\"");
  245.         } catch (SocketTimeoutException ste) {
  246.             throw new Exception("Error sending to " + host
  247.                     + ", connect or read timeout", ste);
  248.         } catch (UnknownHostException uhe) {
  249.             throw new Exception("Unknown host " + host
  250.                     + ", check that DNS is properly configured", uhe);
  251.         } catch (SocketException se) {
  252.             if (!enablePdlProtocol) {
  253.                 // check the old way
  254.                 try {
  255.                     // possible that hub already has product
  256.                     status = new BufferedReader(new InputStreamReader(
  257.                             socket.getInputStream())).readLine();
  258.                     if (status.equals("Product already received")) {
  259.                         // hub already has product
  260.                         LOGGER.info("[" + getName()
  261.                                 + "] hub already has product");
  262.                         return;
  263.                     }
  264.                 } catch (Exception e) {
  265.                     // ignore, already have an exception
  266.                     e.printStackTrace();
  267.                 }
  268.             }
  269.             throw new Exception("Error sending to " + host
  270.                     + ", possible write timeout", se);
  271.         } catch (Exception e) {
  272.             throw new Exception("[" + getName() + "] error sending to " + host,
  273.                     e);
  274.         } finally {
  275.             try {
  276.                 out.close();
  277.             } catch (Exception ignore) {
  278.             }
  279.             socket.close();
  280.             socket = null;
  281.         }

  282.         if (status != null && status.startsWith("Error")) {
  283.             throw new Exception("[" + getName() + "] error sending to " + host
  284.                     + ", message=" + status);
  285.         }
  286.     }

  287.     /**
  288.      * Reads the host and port from config.
  289.      *
  290.      * @param config
  291.      *            a Config object with host and port properties.
  292.      */
  293.     public void configure(Config config) throws Exception {
  294.         host = config.getProperty(SENDER_HOST_PROPERTY);
  295.         if (host == null) {
  296.             throw new ConfigurationException("[" + getName()
  297.                     + "] 'host' is a required configuration property");
  298.         }
  299.         LOGGER.config("[" + getName() + "] host is '" + host + "'");

  300.         port = Integer.parseInt(config.getProperty(SENDER_PORT_PROPERTY,
  301.                 DEFAULT_SENDER_PORT).trim());
  302.         LOGGER.config("[" + getName() + "] port is '" + port + "'");

  303.         connectTimeout = Integer.parseInt(config.getProperty(
  304.                 CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
  305.         LOGGER.config("[" + getName() + "] connectTimeout is '"
  306.                 + connectTimeout + "'");

  307.         readTimeout = Integer.parseInt(config.getProperty(
  308.                 READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
  309.         LOGGER.config("[" + getName() + "] readTimeout is '" + readTimeout
  310.                 + "'");

  311.         writeTimeout = Integer.parseInt(config.getProperty(
  312.                 WRITE_TIMEOUT_PROPERTY, DEFAULT_WRITE_TIMEOUT));
  313.         LOGGER.config("[" + getName() + "] writeTimeout is '" + writeTimeout
  314.                 + "'");

  315.         binaryFormat = Boolean.valueOf(config.getProperty(
  316.                 BINARY_FORMAT_PROPERTY, BINARY_FORMAT_DEFAULT));
  317.         LOGGER.config("[" + getName() + "] using "
  318.                 + (binaryFormat ? "binary" : "xml") + " format");

  319.         enableDeflate = Boolean.valueOf(config.getProperty(
  320.                 ENABLE_DEFLATE_PROPERTY, ENABLE_DEFLATE_DEFAULT));
  321.         LOGGER.config("[" + getName() + "] enableDeflate is " + enableDeflate);

  322.         deflateLevel = Integer.valueOf(config.getProperty(
  323.                 DEFLATE_LEVEL_PROPERTY, DEFLATE_LEVEL_DEFAULT));
  324.         LOGGER.config("[" + getName() + "] deflateLevel is " + deflateLevel);

  325.         enablePdlProtocol = Boolean.valueOf(config.getProperty(
  326.                 ENABLE_PDL_PROTOCOL_PROPERTY, DEFAULT_ENABLE_PDL_PROTOCOL));
  327.         LOGGER.config("[" + getName() + "] enablePdlProtocol is "
  328.                 + enablePdlProtocol);
  329.     }

  330.     /**
  331.      * Makes sure the socket is closed.
  332.      */
  333.     public void shutdown() throws Exception {
  334.         if (socket != null) {
  335.             if (!socket.isOutputShutdown()) {
  336.                 try {
  337.                     socket.getOutputStream().flush();
  338.                     socket.getOutputStream().close();
  339.                 } catch (IOException iox) { /* Ignore */
  340.                 }
  341.             }
  342.             if (!socket.isClosed()) {
  343.                 try {
  344.                     socket.close();
  345.                 } catch (IOException iox) { /* Ignore */
  346.                 }
  347.             }
  348.         }
  349.     }

  350.     /**
  351.      * Does nothing, a socket is opened each time a product is sent.
  352.      */
  353.     public void startup() throws Exception {
  354.         // Nothing to do for startup...
  355.     }

  356.     /**
  357.      * @return the binaryFormat
  358.      */
  359.     public boolean isBinaryFormat() {
  360.         return binaryFormat;
  361.     }

  362.     /**
  363.      * @param binaryFormat
  364.      *            the binaryFormat to set
  365.      */
  366.     public void setBinaryFormat(boolean binaryFormat) {
  367.         this.binaryFormat = binaryFormat;
  368.     }

  369.     /**
  370.      * @return the enableDeflate
  371.      */
  372.     public boolean isEnableDeflate() {
  373.         return enableDeflate;
  374.     }

  375.     /**
  376.      * @param enableDeflate
  377.      *            the enableDeflate to set
  378.      */
  379.     public void setEnableDeflate(boolean enableDeflate) {
  380.         this.enableDeflate = enableDeflate;
  381.     }

  382.     /**
  383.      * @return the deflateLevel
  384.      */
  385.     public int getDeflateLevel() {
  386.         return deflateLevel;
  387.     }

  388.     /**
  389.      * @param deflateLevel
  390.      *            the deflateLevel to set
  391.      */
  392.     public void setDeflateLevel(int deflateLevel) {
  393.         this.deflateLevel = deflateLevel;
  394.     }

  395.     /**
  396.      * @return the enablePdlProtocol
  397.      */
  398.     public boolean isEnablePdlProtocol() {
  399.         return enablePdlProtocol;
  400.     }

  401.     /**
  402.      * @param enablePdlProtocol
  403.      *            the enablePdlProtocol to set
  404.      */
  405.     public void setEnablePdlProtocol(boolean enablePdlProtocol) {
  406.         this.enablePdlProtocol = enablePdlProtocol;
  407.     }

  408.     /**
  409.      * @return the connectTimeout
  410.      */
  411.     public int getConnectTimeout() {
  412.         return connectTimeout;
  413.     }

  414.     /**
  415.      * @param connectTimeout
  416.      *            the connectTimeout to set
  417.      */
  418.     public void setConnectTimeout(int connectTimeout) {
  419.         this.connectTimeout = connectTimeout;
  420.     }

  421.     /**
  422.      * @return the readTimeout
  423.      */
  424.     public int getReadTimeout() {
  425.         return readTimeout;
  426.     }

  427.     /**
  428.      * @param readTimeout
  429.      *            the readTimeout to set
  430.      */
  431.     public void setReadTimeout(int readTimeout) {
  432.         this.readTimeout = readTimeout;
  433.     }

  434.     /**
  435.      * @return the writeTimeout
  436.      */
  437.     public int getWriteTimeout() {
  438.         return writeTimeout;
  439.     }

  440.     /**
  441.      * @param writeTimeout
  442.      *            the writeTimeout to set
  443.      */
  444.     public void setWriteTimeout(int writeTimeout) {
  445.         this.writeTimeout = writeTimeout;
  446.     }

  447.     /**
  448.      * @return the host
  449.      */
  450.     public String getHost() {
  451.         return host;
  452.     }

  453.     /**
  454.      * @param host
  455.      *            the host to set
  456.      */
  457.     public void setHost(String host) {
  458.         this.host = host;
  459.     }

  460.     /**
  461.      * @return the port
  462.      */
  463.     public int getPort() {
  464.         return port;
  465.     }

  466.     /**
  467.      * @param port
  468.      *            the port to set
  469.      */
  470.     public void setPort(int port) {
  471.         this.port = port;
  472.     }

  473. }