SocketAcceptor.java

  1. /*
  2.  * SocketAcceptor
  3.  *
  4.  * $Id$
  5.  * $HeadURL$
  6.  */
  7. package gov.usgs.util;

  8. import java.net.ServerSocket;
  9. import java.net.Socket;

  10. import java.util.Iterator;
  11. import java.util.LinkedList;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.Executors;
  14. import java.util.concurrent.Future;
  15. import java.util.concurrent.TimeUnit;

  16. /**
  17.  * Accept socket connections from a ServerSocket, and notify a listener using a
  18.  * separate thread.
  19.  *
  20.  * @author jmfee
  21.  *
  22.  */
  23. public class SocketAcceptor implements Runnable {

  24.     /** Track active sockets. */
  25.     private LinkedList<Future<?>> activeSockets = new LinkedList<Future<?>>();

  26.     /** Socket used to accept connections. */
  27.     private ServerSocket listener;

  28.     /** Handler used to process accepted connections. */
  29.     private SocketListenerInterface callback;

  30.     /** Sockets are processed by this executor. */
  31.     private ExecutorService socketExecutor;

  32.     /** Whether to keep accepting connections. */
  33.     private boolean listening = true;

  34.     /**
  35.      * Create a new SocketAcceptor object that uses a single thread executor.
  36.      *
  37.      * @param listener
  38.      *            the server socket to accept connections from.
  39.      * @param callback
  40.      *            the object that processes accepted connections.
  41.      */
  42.     public SocketAcceptor(final ServerSocket listener,
  43.             SocketListenerInterface callback) {
  44.         this(listener, callback, Executors.newSingleThreadExecutor());
  45.     }

  46.     /**
  47.      *
  48.      * @param listener
  49.      *            the server socket to accept connections from.
  50.      * @param callback
  51.      *            the object that processes accepted connections.
  52.      * @param executor
  53.      *            the executor used to invoke callback.
  54.      */
  55.     public SocketAcceptor(final ServerSocket listener,
  56.             SocketListenerInterface callback, ExecutorService executor) {
  57.         this.listener = listener;
  58.         this.callback = callback;
  59.         this.socketExecutor = executor;
  60.     }

  61.     /**
  62.      * Start accepting connections in a background thread.
  63.      */
  64.     public void start() {
  65.         new Thread(this).start();
  66.     }

  67.     /**
  68.      * Stop accepting connections.
  69.      */
  70.     public void stop() {
  71.         // tell accept thread to stop accepting
  72.         listening = false;
  73.         try {
  74.             // close the server socket, will cause exception in blocking accept
  75.             // call
  76.             listener.close();
  77.         } catch (Exception e) {
  78.             // ignore
  79.         }

  80.         // process any queued sockets
  81.         socketExecutor.shutdown();
  82.     }

  83.     /**
  84.      * Accept connections until the shutdown method is called.
  85.      */
  86.     public void run() {
  87.         Socket socket = null;
  88.         while (listening) {
  89.             try {
  90.                 socket = listener.accept();
  91.             } catch (Exception e) {
  92.                 socket = null;
  93.                 if (!listening) {
  94.                     //exception was thrown because socket was closed
  95.                     break;
  96.                 }
  97.                 System.err.println("Error accepting connection");
  98.                 e.printStackTrace();
  99.             }

  100.             // check if this is a valid socket
  101.             if (socket != null) {
  102.                 final Socket threadSocket = socket;
  103.                 final SocketListenerInterface threadCallback = callback;

  104.                 // schedule processing
  105.                 Future<?> socketThread = socketExecutor.submit(new Runnable() {
  106.                     public void run() {
  107.                         try {
  108.                             threadCallback.onSocket(threadSocket);
  109.                         } catch (Exception e) {
  110.                             System.err.println("SocketListener callback threw exception:");
  111.                             e.printStackTrace();
  112.                         }
  113.                     }
  114.                 });
  115.                 //track this to see if a socket is blocking...
  116.                 activeSockets.add(socketThread);
  117.             } else {
  118.                 System.err.println("Socket is null while accepting connection.");
  119.             }

  120.             // see how many sockets are still in the queue
  121.             // but first remove any completed sockets
  122.             Iterator<Future<?>> iter = activeSockets.iterator();
  123.             while (iter.hasNext()) {
  124.                 Future<?> next = iter.next();
  125.                 try {
  126.                     if ( (next.get(0, TimeUnit.MILLISECONDS)) == null ) {
  127.                         iter.remove();
  128.                     }
  129.                 } catch (Exception e) {
  130.                     //ignore
  131.                 }
  132.             }
  133.             System.err.println("There are " + activeSockets.size() + " active/queued socket connections");
  134.         }
  135.     }

  136. }