FutureExecutorTask.java
package gov.usgs.util;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* FutureExecutorTask overrides how timeouts are handled to use a
* separate executor service with Futures.
*/
public class FutureExecutorTask<T> extends ExecutorTask<T> {
/** Logging object. */
private static final Logger LOGGER = Logger.getLogger(FutureExecutorTask.class
.getName());
/** Default number of milliseconds to wait before a retry. */
public static final long DEFAULT_RETRY_DELAY = 0L;
/** Default number of tries to run this task. */
public static final int DEFAULT_NUM_TRIES = 1;
/** Default timeout for this task. */
public static final long DEFAULT_TIMEOUT = 0L;
/** ExecutorService used to execute callable. */
protected ExecutorService backgroundService;
/**
* Construct a new ExecutorTask
*
* @param backgroundService
* ExecutorService used to execute callable.
* @param service
* ExecutorService that this task will be submitted to.
* @param maxTries
* maximum number of tries callable can throw an exception or
* timeout before giving up. < 1 means never run.
* @param timeout
* number of milliseconds to allow callable to run before it is
* interrupted. <= 0 means never timeout.
* @param callable
* the callable to call. To work well, the callable should handle
* interrupts gracefully.
* @see InterruptedException
*/
public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service,
int maxTries, long timeout, Callable<T> callable) {
super(service, maxTries, timeout, callable, null, DEFAULT_RETRY_DELAY);
this.backgroundService = backgroundService;
}
/**
* Wraps a runnable and result using the CallableRunnable class.
*
* @param backgroundService
* ExecutorService used to execute callable
* @param service
* ExecutorService that this task will be submitted to.
* @param maxTries
* maximum number of tries callable can throw an exception or
* timeout before giving up. < 1 means never run.
* @param timeout
* number of milliseconds to allow callable to run before it is
* interrupted. <= 0 means never timeout.
* @param runnable a runnable
* @param result the result passed to Executors callable
*
* @see java.util.concurrent.Executors#callable(Runnable, Object)
*/
public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service,
int maxTries, long timeout, Runnable runnable, T result) {
super(service, maxTries, timeout, Executors.callable(runnable, result));
this.backgroundService = backgroundService;
}
/**
* Construct a new FutureExecutorTask
*
* @param backgroundService
* ExecutorService used to execute callable
* @param service
* ExecutorService that this task will be submitted to.
* @param maxTries
* maximum number of tries callable can throw an exception or
* timeout before giving up. < 1 means never run.
* @param timeout
* number of milliseconds to allow callable to run before it is
* interrupted. <= 0 means never timeout.
* @param callable
* the callable to call. To work well, the callable should handle
* interrupts gracefully.
* @param retryTimer
* a timer used to schedule retries when retryDelay is non-zero.
* @param retryDelay
* the number of milliseconds to wait before retrying after an
* exception.
* @see InterruptedException on interrupted
*/
public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service,
int maxTries, long timeout, Callable<T> callable, Timer retryTimer,
long retryDelay) {
super(service, maxTries, timeout, callable, retryTimer, retryDelay);
this.backgroundService = backgroundService;
}
/**
* Run calls the callable, scheduling timeout interruption, catching
* exceptions, and potentially resubmitting to the executor service.
*/
@Override
public void run() {
Future<T> future = null;
try {
if (done || cancelled || numTries >= maxTries) {
// already done, cancelled, or out of attempts
return;
}
// otherwise,
++numTries;
// signal that we are running
runThread = Thread.currentThread();
// use future to manage timeout
future = backgroundService.submit(this.callable);
try {
if (timeout > 0) {
result = future.get(timeout, TimeUnit.MILLISECONDS);
} else {
result = future.get();
}
} finally {
// cancel whether successful (noop) or exception (interrupt callable)
future.cancel(true);
}
// signal that we are done running
runThread = null;
// computed without exceptions, done
setDone();
} catch (Exception e) {
if (e instanceof ExecutionException) {
// unpack cause
Throwable cause = e.getCause();
if (cause != null && cause instanceof Exception) {
e = (Exception) cause;
}
}
LOGGER.log(Level.INFO, "Exception executing task", e);
// signal that we are not running
runThread = null;
// track this exception
exceptions.add(e);
// try to resubmit
if (!cancelled && numTries < maxTries) {
LOGGER.info("Resubmitting task to executor " + numTries + "/"
+ maxTries + " attempts");
SubmitTaskToExecutor retryTask = new SubmitTaskToExecutor(this);
if (retryDelay <= 0L || retryTimer == null) {
retryTask.run();
} else {
retryTimer.schedule(retryTask, retryDelay);
}
} else {
// cancelled or out of tries, done
setDone();
}
}
}
/**
* Submit a FutureExecutorTask to an ExecutorService.
*
* Used to defer resubmission of a task after it fails, but scheduling its
* resubmission using a timer.
*/
private class SubmitTaskToExecutor extends TimerTask {
/** The task to resubmit. */
private FutureExecutorTask<T> task;
/**
* Construct a new SubmitTaskToExecutor instance.
*
* @param task
* the task to resubmit.
*/
public SubmitTaskToExecutor(final FutureExecutorTask<T> task) {
this.task = task;
}
/**
* Submits the task to the executor.
*/
public void run() {
service.submit(task);
}
}
}