AdminSocketServer.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.util.DefaultConfigurable;
- import gov.usgs.util.SocketAcceptor;
- import gov.usgs.util.SocketListenerInterface;
- import gov.usgs.util.StreamUtils;
- import java.io.BufferedReader;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- import java.io.OutputStream;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.concurrent.Executors;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * Telnet to this socket to get a "command prompt".
- *
- * @author jmfee
- */
- public class AdminSocketServer extends DefaultConfigurable implements
- SocketListenerInterface {
- private static final Logger LOGGER = Logger
- .getLogger(AdminSocketServer.class.getName());
- /** Variable for default thread pool size */
- private static final int DEFAULT_THREAD_POOL_SIZE = 10;
- /** Variable for default admin port */
- private static final int DEFAULT_ADMIN_PORT = 11111;
- private int port = -1;
- private int threads = -1;
- private SocketAcceptor acceptor = null;
- /** the client this server is providing stats for. */
- private ProductClient client = null;
- /** Initializes socket with default thread pool size and port */
- public AdminSocketServer() {
- this(DEFAULT_ADMIN_PORT, DEFAULT_THREAD_POOL_SIZE, null);
- }
- /** Initializes socket with custom port, threads, and client
- * @param port Admind port
- * @param threads Thread pool size
- * @param client Product Client
- */
- public AdminSocketServer(final int port, final int threads,
- final ProductClient client) {
- this.port = port;
- this.threads = threads;
- this.client = client;
- }
- public void startup() throws Exception {
- // call DefaultNotificationReceiver startup first
- super.startup();
- ServerSocket socket = new ServerSocket(port);
- socket.setReuseAddress(true);
- acceptor = new SocketAcceptor(socket, this,
- Executors.newFixedThreadPool(threads));
- // start accepting connections via socket
- acceptor.start();
- }
- public void shutdown() throws Exception {
- // stop accepting connections
- try {
- acceptor.stop();
- } finally {
- // shutdown no matter what
- // call DefaultNotificationReceiver shutdown last
- super.shutdown();
- }
- }
- /**
- * Process a line of input.
- *
- * @param line
- * input
- * @param out
- * write generated output to stream
- * @throws Exception if misconfigured or the client quits.
- */
- protected void processLine(final String line, final OutputStream out)
- throws Exception {
- if (client == null) {
- throw new Exception("No product client configured");
- }
- String s = line.trim();
- if (s.equals("status")) {
- out.write(getStatus().getBytes());
- } else if (s.startsWith("reprocess")) {
- out.write(("Reprocess not yet supported").getBytes());
- // reprocess(out, s.replace("reprocess", "").split(" "));
- } else if (s.startsWith("search")) {
- out.write(("Search not yet supported").getBytes());
- // search(out, s.replace("search", "").split(" "));
- } else if (s.equals("quit")) {
- throw new Exception("Bye");
- } else {
- out.write(("Help:\n" + "status - show server status\n"
- + "SOON search [source=SOURCE] [type=TYPE] [code=CODE]\n"
- + "SOON reprocess listener=LISTENER id=PRODUCTID")
- .getBytes());
- }
- }
- private String getStatus() {
- StringBuffer buf = new StringBuffer();
- // receiver queue status
- Iterator<NotificationReceiver> iter = client.getReceivers().iterator();
- while (iter.hasNext()) {
- NotificationReceiver receiver = iter.next();
- if (receiver instanceof DefaultNotificationReceiver) {
- Map<String, Integer> status = ((DefaultNotificationReceiver) receiver)
- .getQueueStatus();
- if (status != null) {
- Iterator<String> queues = status.keySet().iterator();
- while (queues.hasNext()) {
- String queue = queues.next();
- buf.append(queue).append(" = ")
- .append(status.get(queue)).append("\n");
- }
- }
- }
- }
- String status = buf.toString();
- if (status.equals("")) {
- status = "No queues to show";
- }
- return status;
- }
- public void onSocket(Socket socket) {
- LOGGER.info("[" + getName() + "] accepted connection "
- + socket.toString());
- InputStream in = null;
- OutputStream out = null;
- try {
- in = socket.getInputStream();
- out = socket.getOutputStream();
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- String line = null;
- while ((line = br.readLine()) != null) {
- processLine(line, out);
- }
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception while processing socket", ex);
- // tell sender "exception"
- try {
- out.write(("Error receiving product '" + ex.getMessage() + "'")
- .getBytes());
- } catch (Exception ex2) {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] unable to notify sender of exception", ex2);
- }
- } finally {
- StreamUtils.closeStream(in);
- StreamUtils.closeStream(out);
- try {
- socket.shutdownInput();
- } catch (Exception e) {
- // ignore
- }
- try {
- socket.shutdownOutput();
- } catch (Exception e) {
- // ignore
- }
- try {
- socket.close();
- } catch (Exception e) {
- // ignore
- }
- }
- LOGGER.info("[" + getName() + "] closed connection "
- + socket.toString());
- }
- /** @return port */
- public int getPort() {
- return port;
- }
- /** @param port port number */
- public void setPort(int port) {
- this.port = port;
- }
- /** @return threads */
- public int getThreads() {
- return threads;
- }
- /** @param threads set number of threads */
- public void setThreads(int threads) {
- this.threads = threads;
- }
- /** @return product client */
- public ProductClient getClient() {
- return client;
- }
- /** @param client set product client */
- public void setClient(ProductClient client) {
- this.client = client;
- }
- }