SocketProductReceiver.java
/*
* SocketProductReceiver
*/
package gov.usgs.earthquake.distribution;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.util.Config;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;
/**
* Receive Products directly via a Socket.
*
* The received products are sent using a SocketProductSender.
*
* A SocketProductReceiver receives products directly and notifies listeners of
* received notifications.
*
* These are typically used on hubs with an EIDSNotificationSender or
* RelayProductReceiver.
*
* The NotificationReceiver uses a NotificationIndex to track received
* notifications, and a ProductStorage to store retrieved products.
*
* The DefaultNotificationReceiver implements the Configurable interface and
* uses the following configuration parameters:
*
* Each listener has a separate queue of notifications. Each listener is
* allocated one thread to process notifications from this queue.
*/
public class SocketProductReceiver extends DefaultNotificationReceiver
implements SocketListenerInterface {
private static final String THREAD_POOL_SIZE_PROPERTY = "threads";
private static final String DEFAULT_THREAD_POOL_SIZE = "10";
private static final String PRODUCT_PORT_PROPERTY = "port";
private static final String DEFAULT_PRODUCT_PORT = "11235";
private static final String SIZE_LIMIT_PROPERTY = "sizeLimit";
private static final String DEFAULT_SIZE_LIMIT = "-1";
private static final Logger LOGGER = Logger
.getLogger(SocketProductReceiver.class.getName());
private int port = -1;
private int threads = -1;
private long sizeLimit = -1;
private SocketAcceptor acceptor = null;
/**
* Default constructor setting port, threads, and sizeLimit to default
* @throws Exception if error occurs
*/
public SocketProductReceiver() throws Exception {
super();
this.port = Integer.parseInt(DEFAULT_PRODUCT_PORT);
this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
this.sizeLimit = Long.parseLong(DEFAULT_SIZE_LIMIT);
}
/**
* Constructor based on config file
* @param config Configuration file
* @throws Exception if error occurs
*/
public SocketProductReceiver(Config config) throws Exception {
this();
configure(config);
}
public void configure(Config config) throws Exception {
super.configure(config);
this.port = Integer.parseInt(config.getProperty(PRODUCT_PORT_PROPERTY,
DEFAULT_PRODUCT_PORT));
LOGGER.config("[" + getName() + "] port is '" + this.port + "'");
this.threads = Integer.parseInt(config.getProperty(
THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
LOGGER.config("[" + getName() + "] number of threads is '"
+ this.threads + "'");
this.sizeLimit = Long.parseLong(config.getProperty(
SIZE_LIMIT_PROPERTY, DEFAULT_SIZE_LIMIT));
LOGGER.config("[" + getName() + "] size limite is '"
+ this.sizeLimit + "'");
}
public void startup() throws Exception {
// call DefaultNotificationReceiver startup first
super.startup();
ServerSocket socket = new ServerSocket(port);
socket.setReuseAddress(true);
acceptor = new SocketAcceptor(socket, this,
Executors.newFixedThreadPool(threads));
// start accepting connections via socket
acceptor.start();
}
public void shutdown() throws Exception {
// stop accepting connections
acceptor.stop();
// call DefaultNotificationReceiver shutdown last
super.shutdown();
}
public void onSocket(Socket socket) {
LOGGER.info("[" + getName() + "] accepted connection "
+ socket.toString());
try {
new SocketProductReceiverHandler(this, socket).run();
} catch (Exception e) {
LOGGER.log(Level.WARNING,
"[" + getName() + "] uncaught exception processing "
+ socket.toString(), e);
} finally {
try {
socket.shutdownInput();
} catch (Exception e) {
// ignore
}
try {
socket.shutdownOutput();
} catch (Exception e) {
// ignore
}
try {
socket.close();
} catch (Exception e) {
// ignore
}
}
LOGGER.info("[" + getName() + "] closed connection "
+ socket.toString());
}
/**
* Stores ProductSource as a notification, tracks it, and notifies Listeners
* @param source ProductSource
* @return String note for log file
* @throws Exception if error occurs
*/
protected String storeAndNotify(final ProductSource source)
throws Exception {
Notification notification = storeProductSource(source);
if (notification != null) {
// note in log file
String message = "[" + getName() + "] received product '"
+ notification.getProductId().toString() + "'\n";
// product successfully stored, and notification index updated.
new ProductTracker(notification.getTrackerURL()).productReceived(
this.getName(), notification.getProductId());
// notify listeners of newly available product.
notifyListeners(notification);
return message;
} else {
throw new Exception("[" + getName()
+ "] unknown error, no notification generated");
}
}
/** @return port */
public int getPort() {
return port;
}
/** @param port int to set */
public void setPort(int port) {
this.port = port;
}
/** @return sizeLimit */
public long getSizeLimit() {
return sizeLimit;
}
/** @param sizeLimit long to set */
public void setSizeLimit(long sizeLimit) {
this.sizeLimit = sizeLimit;
}
/** @return threads */
public int getThreads() {
return threads;
}
/** @param threads int to set */
public void setThreads(int threads) {
this.threads = threads;
}
}