SocketProductReceiverHandler.java

  1. package gov.usgs.earthquake.distribution;

  2. import java.io.BufferedInputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6. import java.net.Socket;
  7. import java.util.logging.Level;
  8. import java.util.logging.Logger;

  9. import gov.usgs.earthquake.product.ProductId;
  10. import gov.usgs.earthquake.product.io.BinaryIO;
  11. import gov.usgs.earthquake.product.io.IOUtil;
  12. import gov.usgs.earthquake.util.SizeLimitInputStream;
  13. import gov.usgs.util.ObjectLock;
  14. import gov.usgs.util.StreamUtils;

  15. public class SocketProductReceiverHandler implements Runnable {

  16.     private static final Logger LOGGER = Logger.getLogger(SocketProductReceiverHandler.class.getName());

  17.     /** buffer for PDL protocol. Set to 1024 */
  18.     public static final int PDL_PROTOCOL_BUFFER = 1024;

  19.     /** Protected Variable for BinaryIO */
  20.     protected final BinaryIO io = new BinaryIO();
  21.     /** Protected Variable for SocketProductReceiver */
  22.     protected final SocketProductReceiver receiver;
  23.     /** Protected Variable for Socket */
  24.     protected final Socket socket;
  25.     /** Protected Variable for a string of protocolVersion */
  26.     protected String protocolVersion;

  27.     /**
  28.      * Constructor
  29.      * @param receiver SocketProductReceiver
  30.      * @param socket Socket
  31.      */
  32.     public SocketProductReceiverHandler(final SocketProductReceiver receiver, final Socket socket) {
  33.         this.receiver = receiver;
  34.         this.socket = socket;
  35.     }

  36.     /**
  37.      * Acquire write lock in receiver storage.
  38.      *
  39.      * @param id product to lock.
  40.      */
  41.     public void acquireWriteLock(final ProductId id) {
  42.         try {
  43.             ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage())
  44.                     .getStorageLocks();
  45.             storageLocks.acquireWriteLock(id);
  46.         } catch (Exception e) {
  47.             // ignore
  48.         }
  49.     }

  50.     /**
  51.      * Release write lock in receiver storeage.
  52.      *
  53.      * @param id product to unlock.
  54.      */
  55.     public void releaseWriteLock(final ProductId id) {
  56.         if (id == null) {
  57.             return;
  58.         }

  59.         try {
  60.             ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage())
  61.                     .getStorageLocks();
  62.             storageLocks.releaseWriteLock(id);
  63.         } catch (Exception e) {
  64.             // ignore
  65.         }
  66.     }

  67.     /**
  68.      * Read PDL protocol version.
  69.      *
  70.      * @param in input stream to read
  71.      * @return version, or null if not the PDL protocol.
  72.      *
  73.      * @throws IOException if IO error occurs
  74.      */
  75.     public String readProtocolVersion(final InputStream in) throws IOException {
  76.         String version = null;
  77.         if (in.read() == 'P' && in.read() == 'D' && in.read() == 'L') {
  78.             try {
  79.                 version = io.readString(in, PDL_PROTOCOL_BUFFER);
  80.             } catch (IOException e) {
  81.                 if (e.getMessage().contains("maxLength")) {
  82.                     throw new IOException("bad protocol version");
  83.                 } else {
  84.                     throw e;
  85.                 }
  86.             }
  87.         }
  88.         return version;
  89.     }

  90.     /**
  91.      * Process incoming socket connection.
  92.      */
  93.     @Override
  94.     public void run() {
  95.         BufferedInputStream in = null;
  96.         InputStream productIn = null;
  97.         OutputStream out = null;
  98.         ProductId productId = null;

  99.         try {
  100.             socket.setSoTimeout(receiver.getReadTimeout());
  101.             in = new BufferedInputStream(socket.getInputStream());
  102.             out = socket.getOutputStream();

  103.             in.mark(PDL_PROTOCOL_BUFFER);

  104.             protocolVersion = readProtocolVersion(in);
  105.             if (protocolVersion == null) {
  106.                 LOGGER.fine("[" + receiver.getName() + "] not using PDL protocol "
  107.                         + socket);
  108.                 in.reset();
  109.             } else {
  110.                 LOGGER.fine("[" + receiver.getName() + "] protocol version '"
  111.                         + protocolVersion + "' " + socket);

  112.                 // got a version, see if it's supported
  113.                 if (SocketProductSender.PROTOCOL_VERSION_0_1.equals(protocolVersion)) {
  114.                     // product id is only message
  115.                     String productIdString;
  116.                     try {
  117.                         productIdString = io.readString(in, 1024);
  118.                     } catch (IOException e) {
  119.                         if (e.getMessage().contains("maxLength")) {
  120.                             throw new IOException("version too long");
  121.                         } else {
  122.                             throw e;
  123.                         }
  124.                     }
  125.                     productId = ProductId.parse(productIdString);

  126.                     acquireWriteLock(productId);
  127.                     if (receiver.getProductStorage().hasProduct(productId)) {
  128.                         // have product, don't send
  129.                         sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
  130.                         return;
  131.                     } else {
  132.                         // don't have product
  133.                         sendString(out, SocketProductSender.UNKNOWN_PRODUCT);
  134.                         out.flush();
  135.                     }
  136.                 } else {
  137.                     throw new IOException("unsupported protocol version");
  138.                 }
  139.             }

  140.             // read product
  141.             productIn = in;
  142.             if (receiver.getSizeLimit() > 0) {
  143.                 productIn = new SizeLimitInputStream(in, receiver.getSizeLimit());
  144.             }
  145.             String status = receiver.storeAndNotify(IOUtil
  146.                     .autoDetectProductSource(new StreamUtils.UnclosableInputStream(
  147.                             productIn)));
  148.             LOGGER.info(status + " from " + socket.toString());

  149.             try {
  150.                 // tell sender "success"
  151.                 if (protocolVersion == null) {
  152.                     out.write(status.getBytes());
  153.                 } else {
  154.                     io.writeString(status, out);
  155.                 }
  156.                 out.flush();
  157.             } catch (Exception ex) {
  158.                 LOGGER.log(Level.WARNING, "[" + receiver.getName()
  159.                         + "] unable to notify sender of success", ex);
  160.             }
  161.         } catch (Exception ex) {
  162.             sendException(out, ex);
  163.         } finally {
  164.             releaseWriteLock(productId);

  165.             StreamUtils.closeStream(in);
  166.             StreamUtils.closeStream(out);
  167.         }
  168.     }

  169.     /**
  170.      * Send an exception to the user.
  171.      *
  172.      * @param out output stream where exception message is written
  173.      * @param e exception to write
  174.      */
  175.     public void sendException(final OutputStream out, final Exception e) {
  176.         try {
  177.             if (e instanceof ProductAlreadyInStorageException
  178.                     || e.getCause() instanceof ProductAlreadyInStorageException) {
  179.                 LOGGER.info("[" + receiver.getName() + "] product from "
  180.                         + socket.toString() + " already in storage");
  181.                 sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
  182.             } else {
  183.                 // tell sender "exception"
  184.                 LOGGER.log(Level.WARNING, "[" + receiver.getName()
  185.                         + "] exception while processing socket", e);
  186.                 sendString(out, SocketProductSender.RECEIVE_ERROR +
  187.                         " '" + e.getMessage() + "'");
  188.             }
  189.         } catch (Exception e2) {
  190.             //ignore
  191.         }
  192.     }

  193.     /**
  194.      * Send a string to the user.
  195.      *
  196.      * @param out output stream where exception message is written
  197.      * @param str string to write
  198.      * @throws IOException if IO error occurs
  199.      */
  200.     public void sendString(final OutputStream out, final String str) throws IOException {
  201.         try {
  202.             if (protocolVersion == null) {
  203.                 out.write(str.getBytes());
  204.             } else {
  205.                 io.writeString(str, out);
  206.             }
  207.             out.flush();
  208.         } catch (IOException e) {
  209.             LOGGER.log(Level.WARNING, "[" + receiver.getName() +
  210.                     "] unable to send message '" + str + "' to " +
  211.                     socket.toString(), e);
  212.             throw e;
  213.         }
  214.     }

  215. }