SocketProductReceiverHandler.java
package gov.usgs.earthquake.distribution;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.BinaryIO;
import gov.usgs.earthquake.product.io.IOUtil;
import gov.usgs.earthquake.util.SizeLimitInputStream;
import gov.usgs.util.ObjectLock;
import gov.usgs.util.StreamUtils;
public class SocketProductReceiverHandler implements Runnable {
private static final Logger LOGGER = Logger.getLogger(SocketProductReceiverHandler.class.getName());
/** buffer for PDL protocol. Set to 1024 */
public static final int PDL_PROTOCOL_BUFFER = 1024;
/** Protected Variable for BinaryIO */
protected final BinaryIO io = new BinaryIO();
/** Protected Variable for SocketProductReceiver */
protected final SocketProductReceiver receiver;
/** Protected Variable for Socket */
protected final Socket socket;
/** Protected Variable for a string of protocolVersion */
protected String protocolVersion;
/**
* Constructor
* @param receiver SocketProductReceiver
* @param socket Socket
*/
public SocketProductReceiverHandler(final SocketProductReceiver receiver, final Socket socket) {
this.receiver = receiver;
this.socket = socket;
}
/**
* Acquire write lock in receiver storage.
*
* @param id product to lock.
*/
public void acquireWriteLock(final ProductId id) {
try {
ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage())
.getStorageLocks();
storageLocks.acquireWriteLock(id);
} catch (Exception e) {
// ignore
}
}
/**
* Release write lock in receiver storeage.
*
* @param id product to unlock.
*/
public void releaseWriteLock(final ProductId id) {
if (id == null) {
return;
}
try {
ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage())
.getStorageLocks();
storageLocks.releaseWriteLock(id);
} catch (Exception e) {
// ignore
}
}
/**
* Read PDL protocol version.
*
* @param in input stream to read
* @return version, or null if not the PDL protocol.
*
* @throws IOException if IO error occurs
*/
public String readProtocolVersion(final InputStream in) throws IOException {
String version = null;
if (in.read() == 'P' && in.read() == 'D' && in.read() == 'L') {
try {
version = io.readString(in, PDL_PROTOCOL_BUFFER);
} catch (IOException e) {
if (e.getMessage().contains("maxLength")) {
throw new IOException("bad protocol version");
} else {
throw e;
}
}
}
return version;
}
/**
* Process incoming socket connection.
*/
@Override
public void run() {
BufferedInputStream in = null;
InputStream productIn = null;
OutputStream out = null;
ProductId productId = null;
try {
socket.setSoTimeout(receiver.getReadTimeout());
in = new BufferedInputStream(socket.getInputStream());
out = socket.getOutputStream();
in.mark(PDL_PROTOCOL_BUFFER);
protocolVersion = readProtocolVersion(in);
if (protocolVersion == null) {
LOGGER.fine("[" + receiver.getName() + "] not using PDL protocol "
+ socket);
in.reset();
} else {
LOGGER.fine("[" + receiver.getName() + "] protocol version '"
+ protocolVersion + "' " + socket);
// got a version, see if it's supported
if (SocketProductSender.PROTOCOL_VERSION_0_1.equals(protocolVersion)) {
// product id is only message
String productIdString;
try {
productIdString = io.readString(in, 1024);
} catch (IOException e) {
if (e.getMessage().contains("maxLength")) {
throw new IOException("version too long");
} else {
throw e;
}
}
productId = ProductId.parse(productIdString);
acquireWriteLock(productId);
if (receiver.getProductStorage().hasProduct(productId)) {
// have product, don't send
sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
return;
} else {
// don't have product
sendString(out, SocketProductSender.UNKNOWN_PRODUCT);
out.flush();
}
} else {
throw new IOException("unsupported protocol version");
}
}
// read product
productIn = in;
if (receiver.getSizeLimit() > 0) {
productIn = new SizeLimitInputStream(in, receiver.getSizeLimit());
}
String status = receiver.storeAndNotify(IOUtil
.autoDetectProductSource(new StreamUtils.UnclosableInputStream(
productIn)));
LOGGER.info(status + " from " + socket.toString());
try {
// tell sender "success"
if (protocolVersion == null) {
out.write(status.getBytes());
} else {
io.writeString(status, out);
}
out.flush();
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "[" + receiver.getName()
+ "] unable to notify sender of success", ex);
}
} catch (Exception ex) {
sendException(out, ex);
} finally {
releaseWriteLock(productId);
StreamUtils.closeStream(in);
StreamUtils.closeStream(out);
}
}
/**
* Send an exception to the user.
*
* @param out output stream where exception message is written
* @param e exception to write
*/
public void sendException(final OutputStream out, final Exception e) {
try {
if (e instanceof ProductAlreadyInStorageException
|| e.getCause() instanceof ProductAlreadyInStorageException) {
LOGGER.info("[" + receiver.getName() + "] product from "
+ socket.toString() + " already in storage");
sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
} else {
// tell sender "exception"
LOGGER.log(Level.WARNING, "[" + receiver.getName()
+ "] exception while processing socket", e);
sendString(out, SocketProductSender.RECEIVE_ERROR +
" '" + e.getMessage() + "'");
}
} catch (Exception e2) {
//ignore
}
}
/**
* Send a string to the user.
*
* @param out output stream where exception message is written
* @param str string to write
* @throws IOException if IO error occurs
*/
public void sendString(final OutputStream out, final String str) throws IOException {
try {
if (protocolVersion == null) {
out.write(str.getBytes());
} else {
io.writeString(str, out);
}
out.flush();
} catch (IOException e) {
LOGGER.log(Level.WARNING, "[" + receiver.getName() +
"] unable to send message '" + str + "' to " +
socket.toString(), e);
throw e;
}
}
}