ReplicationStorageListener.java
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.ProcessTimeoutException;
import gov.usgs.util.StringUtils;
import gov.usgs.util.TimeoutProcess;
import gov.usgs.util.TimeoutProcessBuilder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ReplicationStorageListener extends DefaultStorageListener {
private static final Logger LOGGER = Logger
.getLogger(ReplicationStorageListener.class.getName());
/**
* Name of the property specifying whether to use archive flag on the
* replication.
*/
public static final String ARCHIVE_FLAG_PROPERTY = "archiveSync";
/**
* Name of the property specifying the replication command on the host
* system.
*/
public static final String REPL_CMD_PROPERTY = "rsync";
/**
* Name of property indicating how many times the replication should be
* attempted before considering it a failure.
*/
public static final String REPL_MAX_TRIES_PROPERTY = "maxTries";
/**
* Name of the property specifying how long to wait for the replication to
* complete successfully.
*/
public static final String REPL_TIMEOUT_PROPERTY = "timeout";
/**
* Name of property specifying to which hosts the storage should be
* replicated.
*/
public static final String REPL_HOSTS_PROPERTY = "targetHosts";
/** Default. Use archiving. */
private static final boolean ARCHIVE_FLAG_DEFAULT = true;
/** Default replication command */
private static final String REPL_CMD_DEFAULT = "rsync";
/** Default number of times to try replication. */
private static final int REPL_MAX_TRIES_DEFAULT = 1;
/** Default replication timeout (milliseconds). */
private static final long REPL_TIMEOUT_DEFAULT = 30000L;
/** Default replication hosts. None. */
private static final Map<String, ExecutorService> REPL_HOSTS_DEFAULT = new HashMap<String, ExecutorService>();
private boolean archiveFlag = ARCHIVE_FLAG_DEFAULT;
private String replCmd = REPL_CMD_DEFAULT;
private int replMaxTries = REPL_MAX_TRIES_DEFAULT;
private long replTimeout = REPL_TIMEOUT_DEFAULT;
private Map<String, ExecutorService> replHosts = REPL_HOSTS_DEFAULT;
/**
* Default constructor used when this object is instantiated via
* configuration.
*/
public ReplicationStorageListener() {
}
/**
* Customer initialization of the constructor
* @param archiveFlag Bool flag of what to do on archive
* @param replCmd Replication command on host system
* @param replTimeout Replication in ms
* @param replHosts List of Replication hosts
*/
public ReplicationStorageListener(final boolean archiveFlag,
String replCmd, final long replTimeout, final List<String> replHosts) {
this.archiveFlag = archiveFlag;
this.replCmd = replCmd;
this.replTimeout = replTimeout;
setReplHosts(replHosts);
}
/**
* Set new Replication hosts
* @param replHosts string list of new hosts
*/
protected void setReplHosts(List<String> replHosts) {
this.replHosts = new HashMap<String, ExecutorService>();
Iterator<String> replHostsIter = replHosts.iterator();
while (replHostsIter.hasNext()) {
String replHost = replHostsIter.next();
ExecutorService service = Executors.newSingleThreadExecutor();
this.replHosts.put(replHost, service);
}
}
@Override
public void configure(Config config) {
// -- Configure the archive flag property
try {
String useArchive = config.getProperty(ARCHIVE_FLAG_PROPERTY);
if ("TRUE".equalsIgnoreCase(useArchive)) {
archiveFlag = true;
} else {
archiveFlag = false;
}
} catch (Exception ex) {
LOGGER.warning("[" + getName()
+ "] replicationStorageListener::Archive flag " + ""
+ "misconfigured. Using default.");
archiveFlag = ARCHIVE_FLAG_DEFAULT;
}
// -- Configure the replication command property
try {
replCmd = config.getProperty(REPL_CMD_PROPERTY);
if (replCmd == null || "".equals(replCmd)) {
replCmd = REPL_CMD_DEFAULT;
}
} catch (Exception ex) {
LOGGER.warning("[" + getName()
+ "] replicationStorageListener::Exception "
+ "configuring replication command. (" + ex.getMessage()
+ ")");
}
// -- Configure the replication max tries property
try {
replMaxTries = Integer.parseInt(config
.getProperty(REPL_MAX_TRIES_PROPERTY));
} catch (NumberFormatException npx) {
LOGGER.warning("[" + getName()
+ "] replicationStorageListener::Bad value for "
+ "replication max tries. Using default.");
replTimeout = REPL_MAX_TRIES_DEFAULT;
} catch (NullPointerException npx) {
// User didn't configure timeout. Just use default; no warning.
replTimeout = REPL_MAX_TRIES_DEFAULT;
}
// -- Configure the replication timeout property
try {
replTimeout = Long.parseLong(config
.getProperty(REPL_TIMEOUT_PROPERTY));
} catch (NumberFormatException npx) {
LOGGER.warning("[" + getName()
+ "] replicationStorageListener::Bad value for "
+ "replication timeout. Using default.");
replTimeout = REPL_TIMEOUT_DEFAULT;
} catch (NullPointerException npx) {
// User didn't configure timeout. Just use default; no warning.
replTimeout = REPL_TIMEOUT_DEFAULT;
}
// -- Configure the replication hosts property
try {
setReplHosts(StringUtils.split(
config.getProperty(REPL_HOSTS_PROPERTY), ","));
} catch (Exception ex) {
LOGGER.warning("["
+ getName()
+ "] replicationStorageListener::No replication hosts configured.");
replHosts = REPL_HOSTS_DEFAULT;
}
}
@Override
public void onProductStored(StorageEvent event) throws Exception {
if (!(event.getProductStorage() instanceof FileProductStorage)) {
return; // Can't replicate a non-file product storage
}
LOGGER.info("[" + getName() + "] product stored. Replicating. ("
+ event.getProductId().toString() + ")");
syncProductContents((FileProductStorage) event.getProductStorage(),
event.getProductId(), false);
LOGGER.info("[" + getName() + "] product replicated to remote. ("
+ event.getProductId().toString() + ")");
}
@Override
public void onProductRemoved(StorageEvent event) throws Exception {
if (!(event.getProductStorage() instanceof FileProductStorage)) {
return; // Can't replicate a non-file product storage
}
LOGGER.info("[" + getName() + "] product removed. Replicating. ("
+ event.getProductId().toString() + ")");
syncProductContents((FileProductStorage) event.getProductStorage(),
event.getProductId(), true);
LOGGER.info("[" + getName() + "] product removal replicated. ("
+ event.getProductId().toString() + ")");
}
/**
*
* @param storage FileProductStorage to use as the base directory
* @param id ID of product in storage
* @param deleting Bool flag for deleting
* @throws IOException if IO error occurs
*/
protected void syncProductContents(FileProductStorage storage,
ProductId id, boolean deleting) throws IOException {
final File baseDir = storage.getBaseDirectory();
final String path = storage.getProductPath(id);
Iterator<String> replHostsIter = replHosts.keySet().iterator();
while (replHostsIter.hasNext()) {
final String replHost = replHostsIter.next();
final ExecutorService service = replHosts.get(replHost);
service.submit(new ReplicationTask(createReplicationCommand(
baseDir, path, replHost, deleting), baseDir, replMaxTries,
replTimeout, service));
}
}
/**
* Create the replication command.
*
* @param baseDir
* The directory from which replication will be executed.
* @param path
* The path of the content to replicate
* @param host
* The host string to which content should be replicated. Format
* = user@host:path
* @param deleting
* Flag whether this should be a deleting replication or not
*
* @return The command and arguments as a list suitable for a
* <code>ProcessBuilder</code>.
* @throws IOException if IO error occurs
*/
protected List<String> createReplicationCommand(final File baseDir,
final String path, final String host, final boolean deleting) throws IOException {
// Make sure we are replicating a directory that actually exists
File source = new File(baseDir, path);
while (!source.exists() && !source.getParentFile().equals(baseDir)) {
source = source.getParentFile();
}
// StringBuffer command = new StringBuffer(replCmd);
List<String> command = new ArrayList<String>();
command.add(replCmd);
if (archiveFlag) {
command.add("-a");
}
command.add("-vz");
command.add("--relative");
command.add("-e");
command.add("ssh -o ConnectTimeout=5");
if (deleting) {
// To do a delete we must sync the parent directory and then
// explicitly include the original target directory and exclude
// everything else.
command.add("--delete");
command.add("--include='" + source.getName() + "**'");
command.add("--exclude='*'");
source = source.getParentFile();
} else {
}
command.add("."
+ source.getCanonicalPath().replace(baseDir.getCanonicalPath(),
""));
command.add(host);
return command;
}
protected class ReplicationTask extends Thread {
// Command to execute
private List<String> command = null;
// String representation of command
private String cmdStr = null;
// Working directory from where to execute the command
private File cwd = null;
// Number of times to try replication
private int numTries = 1;
// How long to let the command try for
private long timeout = 1000L;
// Executor service to repeat this task if appropriate
private ExecutorService service = null;
/**
* Constructor of a replication task
* @param command command to execute
* @param cwd Direcetory to execute the command
* @param numTries How many times to try the replication
* @param timeout in ms
* @param service Executor service
*/
public ReplicationTask(final List<String> command, final File cwd,
final int numTries, final long timeout,
final ExecutorService service) {
this.command = command;
this.cwd = cwd;
this.timeout = timeout;
this.numTries = numTries;
this.service = service;
// Command string for easier viewing
StringBuffer buf = new StringBuffer();
Iterator<String> iter = command.iterator();
while (iter.hasNext()) {
buf.append(iter.next()).append(" ");
}
this.cmdStr = buf.toString().trim();
}
public void run() {
try {
TimeoutProcessBuilder builder = new TimeoutProcessBuilder(
timeout, command);
builder.directory(cwd);
TimeoutProcess process = builder.start();
int exitStatus = process.waitFor();
LOGGER.info("[" + getName() + "] command \"" + cmdStr
+ "\" exited with status [" + exitStatus + "]");
if (exitStatus != 0) {
LOGGER.info("[" + getName() + "] command \"" + cmdStr
+ "\" error output: " + new String(process.errorOutput()));
}
} catch (ProcessTimeoutException cex) {
StringBuffer message = new StringBuffer();
message.append("[" + getName() + "] command \"").append(cmdStr)
.append("\" timed out.");
if (numTries > 0) {
// Try again
message.append(" Trying again.");
service.submit(this);
} else {
message.append(" Not retrying.");
}
LOGGER.warning(message.toString());
} catch (IOException iox) {
LOGGER.log(Level.WARNING, iox.getMessage(), iox);
} catch (InterruptedException iex) {
LOGGER.warning(iex.getMessage());
}
}
}
}