ReplicationStorageListener.java
- package gov.usgs.earthquake.distribution;
- import gov.usgs.earthquake.product.ProductId;
- import gov.usgs.util.Config;
- import gov.usgs.util.ProcessTimeoutException;
- import gov.usgs.util.StringUtils;
- import gov.usgs.util.TimeoutProcess;
- import gov.usgs.util.TimeoutProcessBuilder;
- import java.io.File;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- public class ReplicationStorageListener extends DefaultStorageListener {
- private static final Logger LOGGER = Logger
- .getLogger(ReplicationStorageListener.class.getName());
- /**
- * Name of the property specifying whether to use archive flag on the
- * replication.
- */
- public static final String ARCHIVE_FLAG_PROPERTY = "archiveSync";
- /**
- * Name of the property specifying the replication command on the host
- * system.
- */
- public static final String REPL_CMD_PROPERTY = "rsync";
- /**
- * Name of property indicating how many times the replication should be
- * attempted before considering it a failure.
- */
- public static final String REPL_MAX_TRIES_PROPERTY = "maxTries";
- /**
- * Name of the property specifying how long to wait for the replication to
- * complete successfully.
- */
- public static final String REPL_TIMEOUT_PROPERTY = "timeout";
- /**
- * Name of property specifying to which hosts the storage should be
- * replicated.
- */
- public static final String REPL_HOSTS_PROPERTY = "targetHosts";
- /** Default. Use archiving. */
- private static final boolean ARCHIVE_FLAG_DEFAULT = true;
- /** Default replication command */
- private static final String REPL_CMD_DEFAULT = "rsync";
- /** Default number of times to try replication. */
- private static final int REPL_MAX_TRIES_DEFAULT = 1;
- /** Default replication timeout (milliseconds). */
- private static final long REPL_TIMEOUT_DEFAULT = 30000L;
- /** Default replication hosts. None. */
- private static final Map<String, ExecutorService> REPL_HOSTS_DEFAULT = new HashMap<String, ExecutorService>();
- private boolean archiveFlag = ARCHIVE_FLAG_DEFAULT;
- private String replCmd = REPL_CMD_DEFAULT;
- private int replMaxTries = REPL_MAX_TRIES_DEFAULT;
- private long replTimeout = REPL_TIMEOUT_DEFAULT;
- private Map<String, ExecutorService> replHosts = REPL_HOSTS_DEFAULT;
- /**
- * Default constructor used when this object is instantiated via
- * configuration.
- */
- public ReplicationStorageListener() {
- }
- /**
- * Customer initialization of the constructor
- * @param archiveFlag Bool flag of what to do on archive
- * @param replCmd Replication command on host system
- * @param replTimeout Replication in ms
- * @param replHosts List of Replication hosts
- */
- public ReplicationStorageListener(final boolean archiveFlag,
- String replCmd, final long replTimeout, final List<String> replHosts) {
- this.archiveFlag = archiveFlag;
- this.replCmd = replCmd;
- this.replTimeout = replTimeout;
- setReplHosts(replHosts);
- }
- /**
- * Set new Replication hosts
- * @param replHosts string list of new hosts
- */
- protected void setReplHosts(List<String> replHosts) {
- this.replHosts = new HashMap<String, ExecutorService>();
- Iterator<String> replHostsIter = replHosts.iterator();
- while (replHostsIter.hasNext()) {
- String replHost = replHostsIter.next();
- ExecutorService service = Executors.newSingleThreadExecutor();
- this.replHosts.put(replHost, service);
- }
- }
- @Override
- public void configure(Config config) {
- // -- Configure the archive flag property
- try {
- String useArchive = config.getProperty(ARCHIVE_FLAG_PROPERTY);
- if ("TRUE".equalsIgnoreCase(useArchive)) {
- archiveFlag = true;
- } else {
- archiveFlag = false;
- }
- } catch (Exception ex) {
- LOGGER.warning("[" + getName()
- + "] replicationStorageListener::Archive flag " + ""
- + "misconfigured. Using default.");
- archiveFlag = ARCHIVE_FLAG_DEFAULT;
- }
- // -- Configure the replication command property
- try {
- replCmd = config.getProperty(REPL_CMD_PROPERTY);
- if (replCmd == null || "".equals(replCmd)) {
- replCmd = REPL_CMD_DEFAULT;
- }
- } catch (Exception ex) {
- LOGGER.warning("[" + getName()
- + "] replicationStorageListener::Exception "
- + "configuring replication command. (" + ex.getMessage()
- + ")");
- }
- // -- Configure the replication max tries property
- try {
- replMaxTries = Integer.parseInt(config
- .getProperty(REPL_MAX_TRIES_PROPERTY));
- } catch (NumberFormatException npx) {
- LOGGER.warning("[" + getName()
- + "] replicationStorageListener::Bad value for "
- + "replication max tries. Using default.");
- replTimeout = REPL_MAX_TRIES_DEFAULT;
- } catch (NullPointerException npx) {
- // User didn't configure timeout. Just use default; no warning.
- replTimeout = REPL_MAX_TRIES_DEFAULT;
- }
- // -- Configure the replication timeout property
- try {
- replTimeout = Long.parseLong(config
- .getProperty(REPL_TIMEOUT_PROPERTY));
- } catch (NumberFormatException npx) {
- LOGGER.warning("[" + getName()
- + "] replicationStorageListener::Bad value for "
- + "replication timeout. Using default.");
- replTimeout = REPL_TIMEOUT_DEFAULT;
- } catch (NullPointerException npx) {
- // User didn't configure timeout. Just use default; no warning.
- replTimeout = REPL_TIMEOUT_DEFAULT;
- }
- // -- Configure the replication hosts property
- try {
- setReplHosts(StringUtils.split(
- config.getProperty(REPL_HOSTS_PROPERTY), ","));
- } catch (Exception ex) {
- LOGGER.warning("["
- + getName()
- + "] replicationStorageListener::No replication hosts configured.");
- replHosts = REPL_HOSTS_DEFAULT;
- }
- }
- @Override
- public void onProductStored(StorageEvent event) throws Exception {
- if (!(event.getProductStorage() instanceof FileProductStorage)) {
- return; // Can't replicate a non-file product storage
- }
- LOGGER.info("[" + getName() + "] product stored. Replicating. ("
- + event.getProductId().toString() + ")");
- syncProductContents((FileProductStorage) event.getProductStorage(),
- event.getProductId(), false);
- LOGGER.info("[" + getName() + "] product replicated to remote. ("
- + event.getProductId().toString() + ")");
- }
- @Override
- public void onProductRemoved(StorageEvent event) throws Exception {
- if (!(event.getProductStorage() instanceof FileProductStorage)) {
- return; // Can't replicate a non-file product storage
- }
- LOGGER.info("[" + getName() + "] product removed. Replicating. ("
- + event.getProductId().toString() + ")");
- syncProductContents((FileProductStorage) event.getProductStorage(),
- event.getProductId(), true);
- LOGGER.info("[" + getName() + "] product removal replicated. ("
- + event.getProductId().toString() + ")");
- }
- /**
- *
- * @param storage FileProductStorage to use as the base directory
- * @param id ID of product in storage
- * @param deleting Bool flag for deleting
- * @throws IOException if IO error occurs
- */
- protected void syncProductContents(FileProductStorage storage,
- ProductId id, boolean deleting) throws IOException {
- final File baseDir = storage.getBaseDirectory();
- final String path = storage.getProductPath(id);
- Iterator<String> replHostsIter = replHosts.keySet().iterator();
- while (replHostsIter.hasNext()) {
- final String replHost = replHostsIter.next();
- final ExecutorService service = replHosts.get(replHost);
- service.submit(new ReplicationTask(createReplicationCommand(
- baseDir, path, replHost, deleting), baseDir, replMaxTries,
- replTimeout, service));
- }
- }
- /**
- * Create the replication command.
- *
- * @param baseDir
- * The directory from which replication will be executed.
- * @param path
- * The path of the content to replicate
- * @param host
- * The host string to which content should be replicated. Format
- * = user@host:path
- * @param deleting
- * Flag whether this should be a deleting replication or not
- *
- * @return The command and arguments as a list suitable for a
- * <code>ProcessBuilder</code>.
- * @throws IOException if IO error occurs
- */
- protected List<String> createReplicationCommand(final File baseDir,
- final String path, final String host, final boolean deleting) throws IOException {
- // Make sure we are replicating a directory that actually exists
- File source = new File(baseDir, path);
- while (!source.exists() && !source.getParentFile().equals(baseDir)) {
- source = source.getParentFile();
- }
- // StringBuffer command = new StringBuffer(replCmd);
- List<String> command = new ArrayList<String>();
- command.add(replCmd);
- if (archiveFlag) {
- command.add("-a");
- }
- command.add("-vz");
- command.add("--relative");
- command.add("-e");
- command.add("ssh -o ConnectTimeout=5");
- if (deleting) {
- // To do a delete we must sync the parent directory and then
- // explicitly include the original target directory and exclude
- // everything else.
- command.add("--delete");
- command.add("--include='" + source.getName() + "**'");
- command.add("--exclude='*'");
- source = source.getParentFile();
- } else {
- }
- command.add("."
- + source.getCanonicalPath().replace(baseDir.getCanonicalPath(),
- ""));
- command.add(host);
- return command;
- }
- protected class ReplicationTask extends Thread {
- // Command to execute
- private List<String> command = null;
- // String representation of command
- private String cmdStr = null;
- // Working directory from where to execute the command
- private File cwd = null;
- // Number of times to try replication
- private int numTries = 1;
- // How long to let the command try for
- private long timeout = 1000L;
- // Executor service to repeat this task if appropriate
- private ExecutorService service = null;
- /**
- * Constructor of a replication task
- * @param command command to execute
- * @param cwd Direcetory to execute the command
- * @param numTries How many times to try the replication
- * @param timeout in ms
- * @param service Executor service
- */
- public ReplicationTask(final List<String> command, final File cwd,
- final int numTries, final long timeout,
- final ExecutorService service) {
- this.command = command;
- this.cwd = cwd;
- this.timeout = timeout;
- this.numTries = numTries;
- this.service = service;
- // Command string for easier viewing
- StringBuffer buf = new StringBuffer();
- Iterator<String> iter = command.iterator();
- while (iter.hasNext()) {
- buf.append(iter.next()).append(" ");
- }
- this.cmdStr = buf.toString().trim();
- }
- public void run() {
- try {
- TimeoutProcessBuilder builder = new TimeoutProcessBuilder(
- timeout, command);
- builder.directory(cwd);
- TimeoutProcess process = builder.start();
- int exitStatus = process.waitFor();
- LOGGER.info("[" + getName() + "] command \"" + cmdStr
- + "\" exited with status [" + exitStatus + "]");
- if (exitStatus != 0) {
- LOGGER.info("[" + getName() + "] command \"" + cmdStr
- + "\" error output: " + new String(process.errorOutput()));
- }
- } catch (ProcessTimeoutException cex) {
- StringBuffer message = new StringBuffer();
- message.append("[" + getName() + "] command \"").append(cmdStr)
- .append("\" timed out.");
- if (numTries > 0) {
- // Try again
- message.append(" Trying again.");
- service.submit(this);
- } else {
- message.append(" Not retrying.");
- }
- LOGGER.warning(message.toString());
- } catch (IOException iox) {
- LOGGER.log(Level.WARNING, iox.getMessage(), iox);
- } catch (InterruptedException iex) {
- LOGGER.warning(iex.getMessage());
- }
- }
- }
- }