FutureExecutorTask.java

package gov.usgs.util;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * FutureExecutorTask overrides how timeouts are handled to use a
 * separate executor service with Futures.
 */
public class FutureExecutorTask<T> extends ExecutorTask<T> {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(FutureExecutorTask.class
      .getName());

  /** Default number of milliseconds to wait before a retry. */
  public static final long DEFAULT_RETRY_DELAY = 0L;

  /** Default number of tries to run this task. */
  public static final int DEFAULT_NUM_TRIES = 1;

  /** Default timeout for this task. */
  public static final long DEFAULT_TIMEOUT = 0L;

  /** ExecutorService used to execute callable. */
  protected ExecutorService backgroundService;

  /**
   * Construct a new ExecutorTask
   *
   * @param backgroundService
   *            ExecutorService used to execute callable.
   * @param service
   *            ExecutorService that this task will be submitted to.
   * @param maxTries
   *            maximum number of tries callable can throw an exception or
   *            timeout before giving up. &lt; 1 means never run.
   * @param timeout
   *            number of milliseconds to allow callable to run before it is
   *            interrupted. &lt;= 0 means never timeout.
   * @param callable
   *            the callable to call. To work well, the callable should handle
   *            interrupts gracefully.
   * @see InterruptedException
   */
  public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service,
      int maxTries, long timeout, Callable<T> callable) {
    super(service, maxTries, timeout, callable, null, DEFAULT_RETRY_DELAY);
    this.backgroundService = backgroundService;
  }

  /**
   * Wraps a runnable and result using the CallableRunnable class.
   *
   * @param backgroundService
   *            ExecutorService used to execute callable
   * @param service
   *            ExecutorService that this task will be submitted to.
   * @param maxTries
   *            maximum number of tries callable can throw an exception or
   *            timeout before giving up. &lt; 1 means never run.
   * @param timeout
   *            number of milliseconds to allow callable to run before it is
   *            interrupted. &lt;= 0 means never timeout.
   * @param runnable a runnable
   * @param result the result passed to Executors callable
   *
   * @see java.util.concurrent.Executors#callable(Runnable, Object)
   */
  public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service,
      int maxTries, long timeout, Runnable runnable, T result) {
    super(service, maxTries, timeout, Executors.callable(runnable, result));
    this.backgroundService = backgroundService;
  }

  /**
   * Construct a new FutureExecutorTask
   *
   * @param backgroundService
   *            ExecutorService used to execute callable
   * @param service
   *            ExecutorService that this task will be submitted to.
   * @param maxTries
   *            maximum number of tries callable can throw an exception or
   *            timeout before giving up. &lt; 1 means never run.
   * @param timeout
   *            number of milliseconds to allow callable to run before it is
   *            interrupted. &lt;= 0 means never timeout.
   * @param callable
   *            the callable to call. To work well, the callable should handle
   *            interrupts gracefully.
   * @param retryTimer
   *            a timer used to schedule retries when retryDelay is non-zero.
   * @param retryDelay
   *            the number of milliseconds to wait before retrying after an
   *            exception.
   * @see InterruptedException on interrupted
   */
  public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service,
      int maxTries, long timeout, Callable<T> callable, Timer retryTimer,
      long retryDelay) {
    super(service, maxTries, timeout, callable, retryTimer, retryDelay);
    this.backgroundService = backgroundService;
  }

  /**
   * Run calls the callable, scheduling timeout interruption, catching
   * exceptions, and potentially resubmitting to the executor service.
   */
  @Override
  public void run() {
    Future<T> future = null;
    try {
      if (done || cancelled || numTries >= maxTries) {
        // already done, cancelled, or out of attempts
        return;
      }

      // otherwise,
      ++numTries;

      // signal that we are running
      runThread = Thread.currentThread();

      // use future to manage timeout
      future = backgroundService.submit(this.callable);
      try {
        if (timeout > 0) {
          result = future.get(timeout, TimeUnit.MILLISECONDS);
        } else {
          result = future.get();
        }
      } finally {
        // cancel whether successful (noop) or exception (interrupt callable)
        future.cancel(true);
      }

      // signal that we are done running
      runThread = null;

      // computed without exceptions, done
      setDone();
    } catch (Exception e) {
      if (e instanceof ExecutionException) {
        // unpack cause
        Throwable cause = e.getCause();
        if (cause != null && cause instanceof Exception) {
          e = (Exception) cause;
        }
      }
      LOGGER.log(Level.INFO, "Exception executing task", e);
      // signal that we are not running
      runThread = null;

      // track this exception
      exceptions.add(e);

      // try to resubmit
      if (!cancelled && numTries < maxTries) {
        LOGGER.info("Resubmitting task to executor " + numTries + "/"
            + maxTries + " attempts");
        SubmitTaskToExecutor retryTask = new SubmitTaskToExecutor(this);
        if (retryDelay <= 0L || retryTimer == null) {
          retryTask.run();
        } else {
          retryTimer.schedule(retryTask, retryDelay);
        }
      } else {
        // cancelled or out of tries, done
        setDone();
      }
    }
  }

  /**
   * Submit a FutureExecutorTask to an ExecutorService.
   *
   * Used to defer resubmission of a task after it fails, but scheduling its
   * resubmission using a timer.
   */
  private class SubmitTaskToExecutor extends TimerTask {

    /** The task to resubmit. */
    private FutureExecutorTask<T> task;

    /**
     * Construct a new SubmitTaskToExecutor instance.
     *
     * @param task
     *            the task to resubmit.
     */
    public SubmitTaskToExecutor(final FutureExecutorTask<T> task) {
      this.task = task;
    }

    /**
     * Submits the task to the executor.
     */
    public void run() {
      service.submit(task);
    }

  }

}