ReplicationStorageListener.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.earthquake.product.ProductId;
  3. import gov.usgs.util.Config;
  4. import gov.usgs.util.ProcessTimeoutException;
  5. import gov.usgs.util.StringUtils;
  6. import gov.usgs.util.TimeoutProcess;
  7. import gov.usgs.util.TimeoutProcessBuilder;

  8. import java.io.File;
  9. import java.io.IOException;
  10. import java.util.ArrayList;
  11. import java.util.HashMap;
  12. import java.util.Iterator;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.concurrent.ExecutorService;
  16. import java.util.concurrent.Executors;
  17. import java.util.logging.Level;
  18. import java.util.logging.Logger;

  19. public class ReplicationStorageListener extends DefaultStorageListener {

  20.     private static final Logger LOGGER = Logger
  21.             .getLogger(ReplicationStorageListener.class.getName());

  22.     /**
  23.      * Name of the property specifying whether to use archive flag on the
  24.      * replication.
  25.      */
  26.     public static final String ARCHIVE_FLAG_PROPERTY = "archiveSync";

  27.     /**
  28.      * Name of the property specifying the replication command on the host
  29.      * system.
  30.      */
  31.     public static final String REPL_CMD_PROPERTY = "rsync";

  32.     /**
  33.      * Name of property indicating how many times the replication should be
  34.      * attempted before considering it a failure.
  35.      */
  36.     public static final String REPL_MAX_TRIES_PROPERTY = "maxTries";

  37.     /**
  38.      * Name of the property specifying how long to wait for the replication to
  39.      * complete successfully.
  40.      */
  41.     public static final String REPL_TIMEOUT_PROPERTY = "timeout";

  42.     /**
  43.      * Name of property specifying to which hosts the storage should be
  44.      * replicated.
  45.      */
  46.     public static final String REPL_HOSTS_PROPERTY = "targetHosts";

  47.     /** Default. Use archiving. */
  48.     private static final boolean ARCHIVE_FLAG_DEFAULT = true;

  49.     /** Default replication command */
  50.     private static final String REPL_CMD_DEFAULT = "rsync";

  51.     /** Default number of times to try replication. */
  52.     private static final int REPL_MAX_TRIES_DEFAULT = 1;

  53.     /** Default replication timeout (milliseconds). */
  54.     private static final long REPL_TIMEOUT_DEFAULT = 30000L;

  55.     /** Default replication hosts. None. */
  56.     private static final Map<String, ExecutorService> REPL_HOSTS_DEFAULT = new HashMap<String, ExecutorService>();

  57.     private boolean archiveFlag = ARCHIVE_FLAG_DEFAULT;
  58.     private String replCmd = REPL_CMD_DEFAULT;
  59.     private int replMaxTries = REPL_MAX_TRIES_DEFAULT;
  60.     private long replTimeout = REPL_TIMEOUT_DEFAULT;
  61.     private Map<String, ExecutorService> replHosts = REPL_HOSTS_DEFAULT;

  62.     /**
  63.      * Default constructor used when this object is instantiated via
  64.      * configuration.
  65.      */
  66.     public ReplicationStorageListener() {
  67.     }

  68.     /**
  69.      * Customer initialization of the constructor
  70.      * @param archiveFlag Bool flag of what to do on archive
  71.      * @param replCmd Replication command on host system
  72.      * @param replTimeout Replication in ms
  73.      * @param replHosts List of Replication hosts
  74.      */
  75.     public ReplicationStorageListener(final boolean archiveFlag,
  76.             String replCmd, final long replTimeout, final List<String> replHosts) {
  77.         this.archiveFlag = archiveFlag;
  78.         this.replCmd = replCmd;
  79.         this.replTimeout = replTimeout;
  80.         setReplHosts(replHosts);
  81.     }

  82.     /**
  83.      * Set new Replication hosts
  84.      * @param replHosts string list of new hosts
  85.      */
  86.     protected void setReplHosts(List<String> replHosts) {
  87.         this.replHosts = new HashMap<String, ExecutorService>();
  88.         Iterator<String> replHostsIter = replHosts.iterator();
  89.         while (replHostsIter.hasNext()) {
  90.             String replHost = replHostsIter.next();
  91.             ExecutorService service = Executors.newSingleThreadExecutor();
  92.             this.replHosts.put(replHost, service);
  93.         }
  94.     }

  95.     @Override
  96.     public void configure(Config config) {

  97.         // -- Configure the archive flag property
  98.         try {
  99.             String useArchive = config.getProperty(ARCHIVE_FLAG_PROPERTY);
  100.             if ("TRUE".equalsIgnoreCase(useArchive)) {
  101.                 archiveFlag = true;
  102.             } else {
  103.                 archiveFlag = false;
  104.             }
  105.         } catch (Exception ex) {
  106.             LOGGER.warning("[" + getName()
  107.                     + "] replicationStorageListener::Archive flag " + ""
  108.                     + "misconfigured. Using default.");
  109.             archiveFlag = ARCHIVE_FLAG_DEFAULT;
  110.         }

  111.         // -- Configure the replication command property
  112.         try {
  113.             replCmd = config.getProperty(REPL_CMD_PROPERTY);
  114.             if (replCmd == null || "".equals(replCmd)) {
  115.                 replCmd = REPL_CMD_DEFAULT;
  116.             }
  117.         } catch (Exception ex) {
  118.             LOGGER.warning("[" + getName()
  119.                     + "] replicationStorageListener::Exception "
  120.                     + "configuring replication command. (" + ex.getMessage()
  121.                     + ")");
  122.         }

  123.         // -- Configure the replication max tries property
  124.         try {
  125.             replMaxTries = Integer.parseInt(config
  126.                     .getProperty(REPL_MAX_TRIES_PROPERTY));
  127.         } catch (NumberFormatException npx) {
  128.             LOGGER.warning("[" + getName()
  129.                     + "] replicationStorageListener::Bad value for "
  130.                     + "replication max tries. Using default.");
  131.             replTimeout = REPL_MAX_TRIES_DEFAULT;
  132.         } catch (NullPointerException npx) {
  133.             // User didn't configure timeout. Just use default; no warning.
  134.             replTimeout = REPL_MAX_TRIES_DEFAULT;
  135.         }

  136.         // -- Configure the replication timeout property
  137.         try {
  138.             replTimeout = Long.parseLong(config
  139.                     .getProperty(REPL_TIMEOUT_PROPERTY));
  140.         } catch (NumberFormatException npx) {
  141.             LOGGER.warning("[" + getName()
  142.                     + "] replicationStorageListener::Bad value for "
  143.                     + "replication timeout. Using default.");
  144.             replTimeout = REPL_TIMEOUT_DEFAULT;
  145.         } catch (NullPointerException npx) {
  146.             // User didn't configure timeout. Just use default; no warning.
  147.             replTimeout = REPL_TIMEOUT_DEFAULT;
  148.         }

  149.         // -- Configure the replication hosts property
  150.         try {
  151.             setReplHosts(StringUtils.split(
  152.                     config.getProperty(REPL_HOSTS_PROPERTY), ","));
  153.         } catch (Exception ex) {
  154.             LOGGER.warning("["
  155.                     + getName()
  156.                     + "] replicationStorageListener::No replication hosts configured.");
  157.             replHosts = REPL_HOSTS_DEFAULT;
  158.         }
  159.     }

  160.     @Override
  161.     public void onProductStored(StorageEvent event) throws Exception {
  162.         if (!(event.getProductStorage() instanceof FileProductStorage)) {
  163.             return; // Can't replicate a non-file product storage
  164.         }
  165.         LOGGER.info("[" + getName() + "] product stored. Replicating. ("
  166.                 + event.getProductId().toString() + ")");
  167.         syncProductContents((FileProductStorage) event.getProductStorage(),
  168.                 event.getProductId(), false);
  169.         LOGGER.info("[" + getName() + "] product replicated to remote. ("
  170.                 + event.getProductId().toString() + ")");
  171.     }

  172.     @Override
  173.     public void onProductRemoved(StorageEvent event) throws Exception {
  174.         if (!(event.getProductStorage() instanceof FileProductStorage)) {
  175.             return; // Can't replicate a non-file product storage
  176.         }

  177.         LOGGER.info("[" + getName() + "] product removed. Replicating. ("
  178.                 + event.getProductId().toString() + ")");
  179.         syncProductContents((FileProductStorage) event.getProductStorage(),
  180.                 event.getProductId(), true);
  181.         LOGGER.info("[" + getName() + "] product removal replicated. ("
  182.                 + event.getProductId().toString() + ")");
  183.     }

  184.     /**
  185.      *
  186.      * @param storage FileProductStorage to use as the base directory
  187.      * @param id ID of product in storage
  188.      * @param deleting Bool flag for deleting
  189.      * @throws IOException if IO error occurs
  190.      */
  191.     protected void syncProductContents(FileProductStorage storage,
  192.             ProductId id, boolean deleting) throws IOException {

  193.         final File baseDir = storage.getBaseDirectory();
  194.         final String path = storage.getProductPath(id);

  195.         Iterator<String> replHostsIter = replHosts.keySet().iterator();
  196.         while (replHostsIter.hasNext()) {
  197.             final String replHost = replHostsIter.next();
  198.             final ExecutorService service = replHosts.get(replHost);
  199.             service.submit(new ReplicationTask(createReplicationCommand(
  200.                     baseDir, path, replHost, deleting), baseDir, replMaxTries,
  201.                     replTimeout, service));
  202.         }
  203.     }

  204.     /**
  205.      * Create the replication command.
  206.      *
  207.      * @param baseDir
  208.      *            The directory from which replication will be executed.
  209.      * @param path
  210.      *            The path of the content to replicate
  211.      * @param host
  212.      *            The host string to which content should be replicated. Format
  213.      *            = user@host:path
  214.      * @param deleting
  215.      *            Flag whether this should be a deleting replication or not
  216.      *
  217.      * @return The command and arguments as a list suitable for a
  218.      *         <code>ProcessBuilder</code>.
  219.      * @throws IOException if IO error occurs
  220.      */
  221.     protected List<String> createReplicationCommand(final File baseDir,
  222.             final String path, final String host, final boolean deleting) throws IOException {

  223.         // Make sure we are replicating a directory that actually exists
  224.         File source = new File(baseDir, path);

  225.         while (!source.exists() && !source.getParentFile().equals(baseDir)) {
  226.             source = source.getParentFile();
  227.         }

  228.         // StringBuffer command = new StringBuffer(replCmd);
  229.         List<String> command = new ArrayList<String>();
  230.         command.add(replCmd);

  231.         if (archiveFlag) {
  232.             command.add("-a");
  233.         }

  234.         command.add("-vz");
  235.         command.add("--relative");
  236.         command.add("-e");
  237.         command.add("ssh -o ConnectTimeout=5");

  238.         if (deleting) {
  239.             // To do a delete we must sync the parent directory and then
  240.             // explicitly include the original target directory and exclude
  241.             // everything else.
  242.             command.add("--delete");
  243.             command.add("--include='" + source.getName() + "**'");
  244.             command.add("--exclude='*'");
  245.             source = source.getParentFile();
  246.         } else {

  247.         }

  248.         command.add("."
  249.                 + source.getCanonicalPath().replace(baseDir.getCanonicalPath(),
  250.                         ""));

  251.         command.add(host);

  252.         return command;
  253.     }
  254.     protected class ReplicationTask extends Thread {

  255.         // Command to execute
  256.         private List<String> command = null;
  257.         // String representation of command
  258.         private String cmdStr = null;
  259.         // Working directory from where to execute the command
  260.         private File cwd = null;
  261.         // Number of times to try replication
  262.         private int numTries = 1;
  263.         // How long to let the command try for
  264.         private long timeout = 1000L;
  265.         // Executor service to repeat this task if appropriate
  266.         private ExecutorService service = null;

  267.         /**
  268.          * Constructor of a replication task
  269.          * @param command command to execute
  270.          * @param cwd Direcetory to execute the command
  271.          * @param numTries How many times to try the replication
  272.          * @param timeout in ms
  273.          * @param service Executor service
  274.          */
  275.         public ReplicationTask(final List<String> command, final File cwd,
  276.                 final int numTries, final long timeout,
  277.                 final ExecutorService service) {
  278.             this.command = command;
  279.             this.cwd = cwd;
  280.             this.timeout = timeout;
  281.             this.numTries = numTries;
  282.             this.service = service;

  283.             // Command string for easier viewing
  284.             StringBuffer buf = new StringBuffer();
  285.             Iterator<String> iter = command.iterator();
  286.             while (iter.hasNext()) {
  287.                 buf.append(iter.next()).append(" ");
  288.             }
  289.             this.cmdStr = buf.toString().trim();

  290.         }

  291.         public void run() {
  292.             try {
  293.                 TimeoutProcessBuilder builder = new TimeoutProcessBuilder(
  294.                         timeout, command);
  295.                 builder.directory(cwd);
  296.                 TimeoutProcess process = builder.start();
  297.                 int exitStatus = process.waitFor();

  298.                 LOGGER.info("[" + getName() + "] command \"" + cmdStr
  299.                         + "\" exited with status [" + exitStatus + "]");
  300.                 if (exitStatus != 0) {
  301.                     LOGGER.info("[" + getName() + "] command \"" + cmdStr
  302.                             + "\" error output: " + new String(process.errorOutput()));
  303.                 }
  304.             } catch (ProcessTimeoutException cex) {

  305.                 StringBuffer message = new StringBuffer();
  306.                 message.append("[" + getName() + "] command \"").append(cmdStr)
  307.                         .append("\" timed out.");

  308.                 if (numTries > 0) {
  309.                     // Try again
  310.                     message.append(" Trying again.");
  311.                     service.submit(this);
  312.                 } else {
  313.                     message.append(" Not retrying.");
  314.                 }
  315.                 LOGGER.warning(message.toString());
  316.             } catch (IOException iox) {
  317.                 LOGGER.log(Level.WARNING, iox.getMessage(), iox);
  318.             } catch (InterruptedException iex) {
  319.                 LOGGER.warning(iex.getMessage());
  320.             }
  321.         }
  322.     }
  323. }