AdminSocketServer.java
package gov.usgs.earthquake.distribution;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;
import gov.usgs.util.StreamUtils;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Telnet to this socket to get a "command prompt".
*
* @author jmfee
*/
public class AdminSocketServer extends DefaultConfigurable implements
SocketListenerInterface {
private static final Logger LOGGER = Logger
.getLogger(AdminSocketServer.class.getName());
/** Variable for default thread pool size */
private static final int DEFAULT_THREAD_POOL_SIZE = 10;
/** Variable for default admin port */
private static final int DEFAULT_ADMIN_PORT = 11111;
private int port = -1;
private int threads = -1;
private SocketAcceptor acceptor = null;
/** the client this server is providing stats for. */
private ProductClient client = null;
/** Initializes socket with default thread pool size and port */
public AdminSocketServer() {
this(DEFAULT_ADMIN_PORT, DEFAULT_THREAD_POOL_SIZE, null);
}
/** Initializes socket with custom port, threads, and client
* @param port Admind port
* @param threads Thread pool size
* @param client Product Client
*/
public AdminSocketServer(final int port, final int threads,
final ProductClient client) {
this.port = port;
this.threads = threads;
this.client = client;
}
public void startup() throws Exception {
// call DefaultNotificationReceiver startup first
super.startup();
ServerSocket socket = new ServerSocket(port);
socket.setReuseAddress(true);
acceptor = new SocketAcceptor(socket, this,
Executors.newFixedThreadPool(threads));
// start accepting connections via socket
acceptor.start();
}
public void shutdown() throws Exception {
// stop accepting connections
try {
acceptor.stop();
} finally {
// shutdown no matter what
// call DefaultNotificationReceiver shutdown last
super.shutdown();
}
}
/**
* Process a line of input.
*
* @param line
* input
* @param out
* write generated output to stream
* @throws Exception if misconfigured or the client quits.
*/
protected void processLine(final String line, final OutputStream out)
throws Exception {
if (client == null) {
throw new Exception("No product client configured");
}
String s = line.trim();
if (s.equals("status")) {
out.write(getStatus().getBytes());
} else if (s.startsWith("reprocess")) {
out.write(("Reprocess not yet supported").getBytes());
// reprocess(out, s.replace("reprocess", "").split(" "));
} else if (s.startsWith("search")) {
out.write(("Search not yet supported").getBytes());
// search(out, s.replace("search", "").split(" "));
} else if (s.equals("quit")) {
throw new Exception("Bye");
} else {
out.write(("Help:\n" + "status - show server status\n"
+ "SOON search [source=SOURCE] [type=TYPE] [code=CODE]\n"
+ "SOON reprocess listener=LISTENER id=PRODUCTID")
.getBytes());
}
}
private String getStatus() {
StringBuffer buf = new StringBuffer();
// receiver queue status
Iterator<NotificationReceiver> iter = client.getReceivers().iterator();
while (iter.hasNext()) {
NotificationReceiver receiver = iter.next();
if (receiver instanceof DefaultNotificationReceiver) {
Map<String, Integer> status = ((DefaultNotificationReceiver) receiver)
.getQueueStatus();
if (status != null) {
Iterator<String> queues = status.keySet().iterator();
while (queues.hasNext()) {
String queue = queues.next();
buf.append(queue).append(" = ")
.append(status.get(queue)).append("\n");
}
}
}
}
String status = buf.toString();
if (status.equals("")) {
status = "No queues to show";
}
return status;
}
public void onSocket(Socket socket) {
LOGGER.info("[" + getName() + "] accepted connection "
+ socket.toString());
InputStream in = null;
OutputStream out = null;
try {
in = socket.getInputStream();
out = socket.getOutputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = null;
while ((line = br.readLine()) != null) {
processLine(line, out);
}
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "[" + getName()
+ "] exception while processing socket", ex);
// tell sender "exception"
try {
out.write(("Error receiving product '" + ex.getMessage() + "'")
.getBytes());
} catch (Exception ex2) {
LOGGER.log(Level.WARNING, "[" + getName()
+ "] unable to notify sender of exception", ex2);
}
} finally {
StreamUtils.closeStream(in);
StreamUtils.closeStream(out);
try {
socket.shutdownInput();
} catch (Exception e) {
// ignore
}
try {
socket.shutdownOutput();
} catch (Exception e) {
// ignore
}
try {
socket.close();
} catch (Exception e) {
// ignore
}
}
LOGGER.info("[" + getName() + "] closed connection "
+ socket.toString());
}
/** @return port */
public int getPort() {
return port;
}
/** @param port port number */
public void setPort(int port) {
this.port = port;
}
/** @return threads */
public int getThreads() {
return threads;
}
/** @param threads set number of threads */
public void setThreads(int threads) {
this.threads = threads;
}
/** @return product client */
public ProductClient getClient() {
return client;
}
/** @param client set product client */
public void setClient(ProductClient client) {
this.client = client;
}
}