SocketAcceptor.java
/*
* SocketAcceptor
*
* $Id$
* $HeadURL$
*/
package gov.usgs.util;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Accept socket connections from a ServerSocket, and notify a listener using a
* separate thread.
*
* @author jmfee
*
*/
public class SocketAcceptor implements Runnable {
/** Track active sockets. */
private LinkedList<Future<?>> activeSockets = new LinkedList<Future<?>>();
/** Socket used to accept connections. */
private ServerSocket listener;
/** Handler used to process accepted connections. */
private SocketListenerInterface callback;
/** Sockets are processed by this executor. */
private ExecutorService socketExecutor;
/** Whether to keep accepting connections. */
private boolean listening = true;
/**
* Create a new SocketAcceptor object that uses a single thread executor.
*
* @param listener
* the server socket to accept connections from.
* @param callback
* the object that processes accepted connections.
*/
public SocketAcceptor(final ServerSocket listener,
SocketListenerInterface callback) {
this(listener, callback, Executors.newSingleThreadExecutor());
}
/**
*
* @param listener
* the server socket to accept connections from.
* @param callback
* the object that processes accepted connections.
* @param executor
* the executor used to invoke callback.
*/
public SocketAcceptor(final ServerSocket listener,
SocketListenerInterface callback, ExecutorService executor) {
this.listener = listener;
this.callback = callback;
this.socketExecutor = executor;
}
/**
* Start accepting connections in a background thread.
*/
public void start() {
new Thread(this).start();
}
/**
* Stop accepting connections.
*/
public void stop() {
// tell accept thread to stop accepting
listening = false;
try {
// close the server socket, will cause exception in blocking accept
// call
listener.close();
} catch (Exception e) {
// ignore
}
// process any queued sockets
socketExecutor.shutdown();
}
/**
* Accept connections until the shutdown method is called.
*/
public void run() {
Socket socket = null;
while (listening) {
try {
socket = listener.accept();
} catch (Exception e) {
socket = null;
if (!listening) {
//exception was thrown because socket was closed
break;
}
System.err.println("Error accepting connection");
e.printStackTrace();
}
// check if this is a valid socket
if (socket != null) {
final Socket threadSocket = socket;
final SocketListenerInterface threadCallback = callback;
// schedule processing
Future<?> socketThread = socketExecutor.submit(new Runnable() {
public void run() {
try {
threadCallback.onSocket(threadSocket);
} catch (Exception e) {
System.err.println("SocketListener callback threw exception:");
e.printStackTrace();
}
}
});
//track this to see if a socket is blocking...
activeSockets.add(socketThread);
} else {
System.err.println("Socket is null while accepting connection.");
}
// see how many sockets are still in the queue
// but first remove any completed sockets
Iterator<Future<?>> iter = activeSockets.iterator();
while (iter.hasNext()) {
Future<?> next = iter.next();
try {
if ( (next.get(0, TimeUnit.MILLISECONDS)) == null ) {
iter.remove();
}
} catch (Exception e) {
//ignore
}
}
System.err.println("There are " + activeSockets.size() + " active/queued socket connections");
}
}
}