AdminSocketServer.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.util.DefaultConfigurable;
  3. import gov.usgs.util.SocketAcceptor;
  4. import gov.usgs.util.SocketListenerInterface;
  5. import gov.usgs.util.StreamUtils;

  6. import java.io.BufferedReader;
  7. import java.io.InputStream;
  8. import java.io.InputStreamReader;
  9. import java.io.OutputStream;
  10. import java.net.ServerSocket;
  11. import java.net.Socket;
  12. import java.util.Iterator;
  13. import java.util.Map;
  14. import java.util.concurrent.Executors;
  15. import java.util.logging.Level;
  16. import java.util.logging.Logger;

  17. /**
  18.  * Telnet to this socket to get a "command prompt".
  19.  *
  20.  * @author jmfee
  21.  */
  22. public class AdminSocketServer extends DefaultConfigurable implements
  23.         SocketListenerInterface {

  24.     private static final Logger LOGGER = Logger
  25.             .getLogger(AdminSocketServer.class.getName());

  26.     /** Variable for default thread pool size */
  27.     private static final int DEFAULT_THREAD_POOL_SIZE = 10;
  28.     /** Variable for default admin port */
  29.     private static final int DEFAULT_ADMIN_PORT = 11111;

  30.     private int port = -1;
  31.     private int threads = -1;
  32.     private SocketAcceptor acceptor = null;

  33.     /** the client this server is providing stats for. */
  34.     private ProductClient client = null;

  35.     /** Initializes socket with default thread pool size and port */
  36.     public AdminSocketServer() {
  37.         this(DEFAULT_ADMIN_PORT, DEFAULT_THREAD_POOL_SIZE, null);
  38.     }

  39.     /** Initializes socket with custom port, threads, and client
  40.      * @param port Admind port
  41.      * @param threads Thread pool size
  42.      * @param client Product Client
  43.      */
  44.     public AdminSocketServer(final int port, final int threads,
  45.             final ProductClient client) {
  46.         this.port = port;
  47.         this.threads = threads;
  48.         this.client = client;
  49.     }

  50.     public void startup() throws Exception {
  51.         // call DefaultNotificationReceiver startup first
  52.         super.startup();

  53.         ServerSocket socket = new ServerSocket(port);
  54.         socket.setReuseAddress(true);
  55.         acceptor = new SocketAcceptor(socket, this,
  56.                 Executors.newFixedThreadPool(threads));
  57.         // start accepting connections via socket
  58.         acceptor.start();
  59.     }

  60.     public void shutdown() throws Exception {
  61.         // stop accepting connections
  62.         try {
  63.             acceptor.stop();
  64.         } finally {
  65.             // shutdown no matter what
  66.             // call DefaultNotificationReceiver shutdown last
  67.             super.shutdown();
  68.         }
  69.     }

  70.     /**
  71.      * Process a line of input.
  72.      *
  73.      * @param line
  74.      *            input
  75.      * @param out
  76.      *            write generated output to stream
  77.      * @throws Exception if misconfigured or the client quits.
  78.      */
  79.     protected void processLine(final String line, final OutputStream out)
  80.             throws Exception {
  81.         if (client == null) {
  82.             throw new Exception("No product client configured");
  83.         }

  84.         String s = line.trim();
  85.         if (s.equals("status")) {
  86.             out.write(getStatus().getBytes());
  87.         } else if (s.startsWith("reprocess")) {
  88.             out.write(("Reprocess not yet supported").getBytes());
  89.             // reprocess(out, s.replace("reprocess", "").split(" "));
  90.         } else if (s.startsWith("search")) {
  91.             out.write(("Search not yet supported").getBytes());
  92.             // search(out, s.replace("search", "").split(" "));
  93.         } else if (s.equals("quit")) {
  94.             throw new Exception("Bye");
  95.         } else {
  96.             out.write(("Help:\n" + "status - show server status\n"
  97.                     + "SOON search [source=SOURCE] [type=TYPE] [code=CODE]\n"
  98.                     + "SOON reprocess listener=LISTENER id=PRODUCTID")
  99.                     .getBytes());
  100.         }
  101.     }

  102.     private String getStatus() {
  103.         StringBuffer buf = new StringBuffer();
  104.         // receiver queue status
  105.         Iterator<NotificationReceiver> iter = client.getReceivers().iterator();
  106.         while (iter.hasNext()) {
  107.             NotificationReceiver receiver = iter.next();
  108.             if (receiver instanceof DefaultNotificationReceiver) {
  109.                 Map<String, Integer> status = ((DefaultNotificationReceiver) receiver)
  110.                         .getQueueStatus();
  111.                 if (status != null) {
  112.                     Iterator<String> queues = status.keySet().iterator();
  113.                     while (queues.hasNext()) {
  114.                         String queue = queues.next();
  115.                         buf.append(queue).append(" = ")
  116.                                 .append(status.get(queue)).append("\n");
  117.                     }
  118.                 }
  119.             }
  120.         }

  121.         String status = buf.toString();
  122.         if (status.equals("")) {
  123.             status = "No queues to show";
  124.         }
  125.         return status;
  126.     }

  127.     public void onSocket(Socket socket) {
  128.         LOGGER.info("[" + getName() + "] accepted connection "
  129.                 + socket.toString());

  130.         InputStream in = null;
  131.         OutputStream out = null;

  132.         try {
  133.             in = socket.getInputStream();
  134.             out = socket.getOutputStream();

  135.             BufferedReader br = new BufferedReader(new InputStreamReader(in));
  136.             String line = null;
  137.             while ((line = br.readLine()) != null) {
  138.                 processLine(line, out);
  139.             }
  140.         } catch (Exception ex) {
  141.             LOGGER.log(Level.WARNING, "[" + getName()
  142.                     + "] exception while processing socket", ex);
  143.             // tell sender "exception"
  144.             try {
  145.                 out.write(("Error receiving product '" + ex.getMessage() + "'")
  146.                         .getBytes());
  147.             } catch (Exception ex2) {
  148.                 LOGGER.log(Level.WARNING, "[" + getName()
  149.                         + "] unable to notify sender of exception", ex2);
  150.             }
  151.         } finally {
  152.             StreamUtils.closeStream(in);
  153.             StreamUtils.closeStream(out);

  154.             try {
  155.                 socket.shutdownInput();
  156.             } catch (Exception e) {
  157.                 // ignore
  158.             }
  159.             try {
  160.                 socket.shutdownOutput();
  161.             } catch (Exception e) {
  162.                 // ignore
  163.             }

  164.             try {
  165.                 socket.close();
  166.             } catch (Exception e) {
  167.                 // ignore
  168.             }
  169.         }

  170.         LOGGER.info("[" + getName() + "] closed connection "
  171.                 + socket.toString());
  172.     }

  173.     /** @return port */
  174.     public int getPort() {
  175.         return port;
  176.     }

  177.     /** @param port port number */
  178.     public void setPort(int port) {
  179.         this.port = port;
  180.     }

  181.     /** @return threads */
  182.     public int getThreads() {
  183.         return threads;
  184.     }

  185.     /** @param threads set number of threads */
  186.     public void setThreads(int threads) {
  187.         this.threads = threads;
  188.     }

  189.     /** @return product client */
  190.     public ProductClient getClient() {
  191.         return client;
  192.     }

  193.     /** @param client set product client */
  194.     public void setClient(ProductClient client) {
  195.         this.client = client;
  196.     }

  197. }