SocketProductReceiver.java

  1. /*
  2.  * SocketProductReceiver
  3.  */
  4. package gov.usgs.earthquake.distribution;

  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. import java.util.concurrent.Executors;
  8. import java.util.logging.Level;
  9. import java.util.logging.Logger;

  10. import gov.usgs.earthquake.product.io.ProductSource;
  11. import gov.usgs.util.Config;
  12. import gov.usgs.util.SocketAcceptor;
  13. import gov.usgs.util.SocketListenerInterface;

  14. /**
  15.  * Receive Products directly via a Socket.
  16.  *
  17.  * The received products are sent using a SocketProductSender.
  18.  *
  19.  * A SocketProductReceiver receives products directly and notifies listeners of
  20.  * received notifications.
  21.  *
  22.  * These are typically used on hubs with an EIDSNotificationSender or
  23.  * RelayProductReceiver.
  24.  *
  25.  * The NotificationReceiver uses a NotificationIndex to track received
  26.  * notifications, and a ProductStorage to store retrieved products.
  27.  *
  28.  * The DefaultNotificationReceiver implements the Configurable interface and
  29.  * uses the following configuration parameters:
  30.  *
  31.  * Each listener has a separate queue of notifications. Each listener is
  32.  * allocated one thread to process notifications from this queue.
  33.  */
  34. public class SocketProductReceiver extends DefaultNotificationReceiver
  35.         implements SocketListenerInterface {

  36.     private static final String THREAD_POOL_SIZE_PROPERTY = "threads";

  37.     private static final String DEFAULT_THREAD_POOL_SIZE = "10";

  38.     private static final String PRODUCT_PORT_PROPERTY = "port";

  39.     private static final String DEFAULT_PRODUCT_PORT = "11235";

  40.     private static final String SIZE_LIMIT_PROPERTY = "sizeLimit";

  41.     private static final String DEFAULT_SIZE_LIMIT = "-1";


  42.     private static final Logger LOGGER = Logger
  43.             .getLogger(SocketProductReceiver.class.getName());

  44.     private int port = -1;
  45.     private int threads = -1;
  46.     private long sizeLimit = -1;

  47.     private SocketAcceptor acceptor = null;

  48.     /**
  49.      * Default constructor setting port, threads, and sizeLimit to default
  50.      * @throws Exception if error occurs
  51.      */
  52.     public SocketProductReceiver() throws Exception {
  53.         super();
  54.         this.port = Integer.parseInt(DEFAULT_PRODUCT_PORT);
  55.         this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
  56.         this.sizeLimit = Long.parseLong(DEFAULT_SIZE_LIMIT);
  57.     }

  58.     /**
  59.      * Constructor based on config file
  60.      * @param config Configuration file
  61.      * @throws Exception if error occurs
  62.      */
  63.     public SocketProductReceiver(Config config) throws Exception {
  64.         this();
  65.         configure(config);
  66.     }

  67.     public void configure(Config config) throws Exception {
  68.         super.configure(config);
  69.         this.port = Integer.parseInt(config.getProperty(PRODUCT_PORT_PROPERTY,
  70.                 DEFAULT_PRODUCT_PORT));
  71.         LOGGER.config("[" + getName() + "]  port is '" + this.port + "'");

  72.         this.threads = Integer.parseInt(config.getProperty(
  73.                 THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
  74.         LOGGER.config("[" + getName() + "]  number of threads is '"
  75.                 + this.threads + "'");

  76.         this.sizeLimit = Long.parseLong(config.getProperty(
  77.                 SIZE_LIMIT_PROPERTY, DEFAULT_SIZE_LIMIT));
  78.         LOGGER.config("[" + getName() + "] size limite is '"
  79.                 + this.sizeLimit + "'");
  80.     }

  81.     public void startup() throws Exception {
  82.         // call DefaultNotificationReceiver startup first
  83.         super.startup();

  84.         ServerSocket socket = new ServerSocket(port);
  85.         socket.setReuseAddress(true);
  86.         acceptor = new SocketAcceptor(socket, this,
  87.                 Executors.newFixedThreadPool(threads));
  88.         // start accepting connections via socket
  89.         acceptor.start();
  90.     }

  91.     public void shutdown() throws Exception {
  92.         // stop accepting connections
  93.         acceptor.stop();

  94.         // call DefaultNotificationReceiver shutdown last
  95.         super.shutdown();
  96.     }

  97.     public void onSocket(Socket socket) {
  98.         LOGGER.info("[" + getName() + "] accepted connection "
  99.                 + socket.toString());

  100.         try {
  101.             new SocketProductReceiverHandler(this, socket).run();
  102.         } catch (Exception e) {
  103.             LOGGER.log(Level.WARNING,
  104.                     "[" + getName() + "] uncaught exception processing "
  105.                     + socket.toString(), e);
  106.         } finally {
  107.             try {
  108.                 socket.shutdownInput();
  109.             } catch (Exception e) {
  110.                 // ignore
  111.             }
  112.             try {
  113.                 socket.shutdownOutput();
  114.             } catch (Exception e) {
  115.                 // ignore
  116.             }

  117.             try {
  118.                 socket.close();
  119.             } catch (Exception e) {
  120.                 // ignore
  121.             }
  122.         }

  123.         LOGGER.info("[" + getName() + "] closed connection "
  124.                 + socket.toString());
  125.     }

  126.     /**
  127.      * Stores ProductSource as a notification, tracks it, and notifies Listeners
  128.      * @param source ProductSource
  129.      * @return String note for log file
  130.      * @throws Exception if error occurs
  131.      */
  132.     protected String storeAndNotify(final ProductSource source)
  133.             throws Exception {
  134.         Notification notification = storeProductSource(source);
  135.         if (notification != null) {
  136.             // note in log file
  137.             String message = "[" + getName() + "] received product '"
  138.                     + notification.getProductId().toString() + "'\n";

  139.             // product successfully stored, and notification index updated.
  140.             new ProductTracker(notification.getTrackerURL()).productReceived(
  141.                     this.getName(), notification.getProductId());

  142.             // notify listeners of newly available product.
  143.             notifyListeners(notification);

  144.             return message;
  145.         } else {
  146.             throw new Exception("[" + getName()
  147.                     + "] unknown error, no notification generated");
  148.         }
  149.     }

  150.     /** @return port */
  151.     public int getPort() {
  152.         return port;
  153.     }

  154.     /** @param port int to set */
  155.     public void setPort(int port) {
  156.         this.port = port;
  157.     }

  158.     /** @return sizeLimit */
  159.     public long getSizeLimit() {
  160.         return sizeLimit;
  161.     }

  162.     /** @param sizeLimit long to set */
  163.     public void setSizeLimit(long sizeLimit) {
  164.         this.sizeLimit = sizeLimit;
  165.     }

  166.     /** @return threads */
  167.     public int getThreads() {
  168.         return threads;
  169.     }

  170.     /** @param threads int to set */
  171.     public void setThreads(int threads) {
  172.         this.threads = threads;
  173.     }

  174. }