SocketProductReceiverHandler.java
- package gov.usgs.earthquake.distribution;
- import java.io.BufferedInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.Socket;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.earthquake.product.io.BinaryIO;
- import gov.usgs.earthquake.product.io.IOUtil;
- import gov.usgs.earthquake.util.SizeLimitInputStream;
- import gov.usgs.util.ObjectLock;
- import gov.usgs.util.StreamUtils;
- public class SocketProductReceiverHandler implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(SocketProductReceiverHandler.class.getName());
- /** buffer for PDL protocol. Set to 1024 */
- public static final int PDL_PROTOCOL_BUFFER = 1024;
- /** Protected Variable for BinaryIO */
- protected final BinaryIO io = new BinaryIO();
- /** Protected Variable for SocketProductReceiver */
- protected final SocketProductReceiver receiver;
- /** Protected Variable for Socket */
- protected final Socket socket;
- /** Protected Variable for a string of protocolVersion */
- protected String protocolVersion;
- /**
- * Constructor
- * @param receiver SocketProductReceiver
- * @param socket Socket
- */
- public SocketProductReceiverHandler(final SocketProductReceiver receiver, final Socket socket) {
- this.receiver = receiver;
- this.socket = socket;
- }
- /**
- * Acquire write lock in receiver storage.
- *
- * @param id product to lock.
- */
- public void acquireWriteLock(final ProductId id) {
- try {
- ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage())
- .getStorageLocks();
- storageLocks.acquireWriteLock(id);
- } catch (Exception e) {
- // ignore
- }
- }
- /**
- * Release write lock in receiver storeage.
- *
- * @param id product to unlock.
- */
- public void releaseWriteLock(final ProductId id) {
- if (id == null) {
- return;
- }
- try {
- ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage())
- .getStorageLocks();
- storageLocks.releaseWriteLock(id);
- } catch (Exception e) {
- // ignore
- }
- }
- /**
- * Read PDL protocol version.
- *
- * @param in input stream to read
- * @return version, or null if not the PDL protocol.
- *
- * @throws IOException if IO error occurs
- */
- public String readProtocolVersion(final InputStream in) throws IOException {
- String version = null;
- if (in.read() == 'P' && in.read() == 'D' && in.read() == 'L') {
- try {
- version = io.readString(in, PDL_PROTOCOL_BUFFER);
- } catch (IOException e) {
- if (e.getMessage().contains("maxLength")) {
- throw new IOException("bad protocol version");
- } else {
- throw e;
- }
- }
- }
- return version;
- }
- /**
- * Process incoming socket connection.
- */
- @Override
- public void run() {
- BufferedInputStream in = null;
- InputStream productIn = null;
- OutputStream out = null;
- ProductId productId = null;
- try {
- socket.setSoTimeout(receiver.getReadTimeout());
- in = new BufferedInputStream(socket.getInputStream());
- out = socket.getOutputStream();
- in.mark(PDL_PROTOCOL_BUFFER);
- protocolVersion = readProtocolVersion(in);
- if (protocolVersion == null) {
- LOGGER.fine("[" + receiver.getName() + "] not using PDL protocol "
- + socket);
- in.reset();
- } else {
- LOGGER.fine("[" + receiver.getName() + "] protocol version '"
- + protocolVersion + "' " + socket);
- // got a version, see if it's supported
- if (SocketProductSender.PROTOCOL_VERSION_0_1.equals(protocolVersion)) {
- // product id is only message
- String productIdString;
- try {
- productIdString = io.readString(in, 1024);
- } catch (IOException e) {
- if (e.getMessage().contains("maxLength")) {
- throw new IOException("version too long");
- } else {
- throw e;
- }
- }
- productId = ProductId.parse(productIdString);
- acquireWriteLock(productId);
- if (receiver.getProductStorage().hasProduct(productId)) {
- // have product, don't send
- sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
- return;
- } else {
- // don't have product
- sendString(out, SocketProductSender.UNKNOWN_PRODUCT);
- out.flush();
- }
- } else {
- throw new IOException("unsupported protocol version");
- }
- }
- // read product
- productIn = in;
- if (receiver.getSizeLimit() > 0) {
- productIn = new SizeLimitInputStream(in, receiver.getSizeLimit());
- }
- String status = receiver.storeAndNotify(IOUtil
- .autoDetectProductSource(new StreamUtils.UnclosableInputStream(
- productIn)));
- LOGGER.info(status + " from " + socket.toString());
- try {
- // tell sender "success"
- if (protocolVersion == null) {
- out.write(status.getBytes());
- } else {
- io.writeString(status, out);
- }
- out.flush();
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, "[" + receiver.getName()
- + "] unable to notify sender of success", ex);
- }
- } catch (Exception ex) {
- sendException(out, ex);
- } finally {
- releaseWriteLock(productId);
- StreamUtils.closeStream(in);
- StreamUtils.closeStream(out);
- }
- }
- /**
- * Send an exception to the user.
- *
- * @param out output stream where exception message is written
- * @param e exception to write
- */
- public void sendException(final OutputStream out, final Exception e) {
- try {
- if (e instanceof ProductAlreadyInStorageException
- || e.getCause() instanceof ProductAlreadyInStorageException) {
- LOGGER.info("[" + receiver.getName() + "] product from "
- + socket.toString() + " already in storage");
- sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
- } else {
- // tell sender "exception"
- LOGGER.log(Level.WARNING, "[" + receiver.getName()
- + "] exception while processing socket", e);
- sendString(out, SocketProductSender.RECEIVE_ERROR +
- " '" + e.getMessage() + "'");
- }
- } catch (Exception e2) {
- //ignore
- }
- }
- /**
- * Send a string to the user.
- *
- * @param out output stream where exception message is written
- * @param str string to write
- * @throws IOException if IO error occurs
- */
- public void sendString(final OutputStream out, final String str) throws IOException {
- try {
- if (protocolVersion == null) {
- out.write(str.getBytes());
- } else {
- io.writeString(str, out);
- }
- out.flush();
- } catch (IOException e) {
- LOGGER.log(Level.WARNING, "[" + receiver.getName() +
- "] unable to send message '" + str + "' to " +
- socket.toString(), e);
- throw e;
- }
- }
- }