ExecutorTask.java
/*
* ExecutorTask
*
* $Id$
* $URL$
*/
package gov.usgs.util;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A wrapper for Runnable or Callable objects for use with an ExecutorService.
*
* Can be used to schedule interrupt based timeouts, multiple attempts, and
* Future style exception tracking for Runnable or Callable objects.
*
* @param <T> return type for callable.
*/
public class ExecutorTask<T> implements Future<T>, Runnable {
/** Logging object. */
private static final Logger LOGGER = Logger.getLogger(ExecutorTask.class
.getName());
/** Default number of milliseconds to wait before a retry. */
public static final long DEFAULT_RETRY_DELAY = 0L;
/** Default number of tries to run this task. */
public static final int DEFAULT_NUM_TRIES = 1;
/** Default timeout for this task. */
public static final long DEFAULT_TIMEOUT = 0L;
/** ExecutorService used to execute this task. */
protected ExecutorService service;
/** The callable to be called. */
protected Callable<T> callable;
/** Timeout for task. */
protected long timeout = DEFAULT_TIMEOUT;
/** Number of tries to execute this task. */
protected int maxTries = DEFAULT_NUM_TRIES;
/** Number of milliseconds to wait before trying again. */
protected long retryDelay = DEFAULT_RETRY_DELAY;
/** Timer used to schedule retries, when they have a non-zero delay. */
protected Timer retryTimer;
/** The future from the executor service. */
protected T result;
/** List of exceptions thrown, up to maxTries in length. */
ArrayList<Exception> exceptions;
/** Whether this task is complete. */
protected Boolean done = false;
/** Whether this task has been canceled. */
protected Boolean cancelled = false;
/** Number of tries used. */
protected int numTries = 0;
/** The thread where this is running, used to interrupt. */
protected Thread runThread = null;
/** Name for this task. */
protected String name = null;
/** A synchronized object */
protected final Object syncObject = new Object();
/**
* Construct a new ExecutorTask
*
* @param service
* ExecutorService that this task will be submitted to.
* @param maxTries
* maximum number of tries callable can throw an exception or
* timeout before giving up. < 1 means never run.
* @param timeout
* number of milliseconds to allow callable to run before it is
* interrupted. <= 0 means never timeout.
* @param callable
* the callable to call. To work well, the callable should handle
* interrupts gracefully.
* @see InterruptedException
*/
public ExecutorTask(ExecutorService service, int maxTries, long timeout,
Callable<T> callable) {
this(service, maxTries, timeout, callable, null, DEFAULT_RETRY_DELAY);
}
/**
* Wraps a runnable and result using the CallableRunnable class.
* @param service
* ExecutorService that this task will be submitted to.
* @param maxTries
* maximum number of tries callable can throw an exception or
* timeout before giving up. < 1 means never run.
* @param timeout
* number of milliseconds to allow callable to run before it is
* interrupted. <= 0 means never timeout.
* @param runnable a runnable
* @param result the result
*
* @see java.util.concurrent.Executors#callable(Runnable, Object)
*/
public ExecutorTask(ExecutorService service, int maxTries, long timeout,
Runnable runnable, T result) {
this(service, maxTries, timeout, Executors.callable(runnable, result));
}
/**
* Construct a new ExecutorTask
*
* @param service
* ExecutorService that this task will be submitted to.
* @param maxTries
* maximum number of tries callable can throw an exception or
* timeout before giving up. < 1 means never run.
* @param timeout
* number of milliseconds to allow callable to run before it is
* interrupted. <= 0 means never timeout.
* @param callable
* the callable to call. To work well, the callable should handle
* interrupts gracefully.
* @param retryTimer
* a timer used to schedule retries when retryDelay is non-zero.
* @param retryDelay
* the number of milliseconds to wait before retrying after an
* exception.
* @see InterruptedException
*/
public ExecutorTask(ExecutorService service, int maxTries, long timeout,
Callable<T> callable, Timer retryTimer, long retryDelay) {
this.service = service;
this.maxTries = maxTries;
this.timeout = timeout;
this.callable = callable;
this.exceptions = new ArrayList<Exception>(maxTries);
this.retryTimer = retryTimer;
this.retryDelay = retryDelay;
}
/**
* Run calls the callable, scheduling timeout interruption, catching
* exceptions, and potentially resubmitting to the executor service.
*/
@Override
public void run() {
// used to schedule timeout
Timer timeoutTimer = new Timer();
try {
// synchronized (this) {
if (done || cancelled || numTries >= maxTries) {
// already done, cancelled, or out of attempts
return;
}
// otherwise,
++numTries;
// signal that we are running
runThread = Thread.currentThread();
if (timeout > 0) {
// schedule interrupt
final Thread currentThread = runThread;
timeoutTimer.schedule(new TimerTask() {
@Override
public void run() {
LOGGER.fine("Interrupting executor thread");
currentThread.interrupt();
}
}, timeout);
}
// }
// compute result, outside synchronized
result = callable.call();
// synchronized (this) {
// signal that we are done running
runThread = null;
// computed without exceptions, done
setDone();
// }
} catch (Exception e) {
LOGGER.log(Level.INFO, "Exception executing task", e);
// synchronized (this) {
// signal that we are not running
runThread = null;
// track this exception
exceptions.add(e);
// try to resubmit
if (!cancelled && numTries < maxTries) {
LOGGER.info("Resubmitting task to executor " + numTries + "/"
+ maxTries + " attempts");
SubmitTaskToExecutor retryTask = new SubmitTaskToExecutor(this);
if (retryDelay <= 0L || retryTimer == null) {
retryTask.run();
} else {
retryTimer.schedule(retryTask, retryDelay);
}
} else {
// cancelled or out of tries, done
setDone();
}
// }
} finally {
// cancel timeout based interrupt
timeoutTimer.cancel();
}
}
/**
* Called when task is completed, either successfully, or unsuccessfully and
* has no more tries
*/
protected void setDone() {
// done running, either successfully or because out of tries
done = true;
// notify anyone waiting for task to complete
synchronized (syncObject) {
syncObject.notifyAll();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (cancelled || done) {
// already canceled or complete
return cancelled;
}
cancelled = true;
if (runThread != null && mayInterruptIfRunning) {
// running, try to interrupt
runThread.interrupt();
}
// thread may still be running, if it doesn't handle interrupts well,
// but Future interface says we are done
setDone();
return cancelled;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return done;
}
/**
* Get the result returned by the callable.
*/
@Override
public T get() throws InterruptedException, ExecutionException {
while (!cancelled && !done && numTries < maxTries) {
synchronized (syncObject) {
syncObject.wait();
}
}
if (numTries == maxTries && exceptions.size() == maxTries) {
// can't execute any more, signal using most recent exception
throw new ExecutionException(exceptions.get(maxTries - 1));
}
return result;
}
/**
* Get the result returned by the callable.
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
if (!cancelled && !done && numTries < maxTries) {
synchronized (syncObject) {
unit.timedWait(syncObject, timeout);
}
}
if (!cancelled && !done) {
// must have timed out
throw new TimeoutException();
}
if (numTries == maxTries && exceptions.size() == maxTries) {
// can't execute any more, signal using most recent exception
throw new ExecutionException(exceptions.get(maxTries - 1));
}
return result;
}
/**
* Number of tries used.
*
* @return actual number of attempts.
*/
public int getNumTries() {
return numTries;
}
/**
* Maximum number of tries before giving up.
*
* @return maximum number of attempts.
*/
public int getMaxTries() {
return maxTries;
}
/**
* Any exceptions thrown, during any execution attempt.
*
* @return array of thrown exceptions. should contain no more than numTries
* exceptions.
*/
public ArrayList<Exception> getExceptions() {
return exceptions;
}
/**
* The callable object that is/was called.
*
* @return The callable object for this task. If this task was created using
* a runnable, this was created using Executors.callable(Runnable).
*/
public Callable<T> getCallable() {
return callable;
}
/** @return name */
public String getName() {
return this.name;
}
/** @param name to set */
public void setName(final String name) {
this.name = name;
}
/**
* @return the retryDelay
*/
public long getRetryDelay() {
return retryDelay;
}
/**
* @param retryDelay
* the retryDelay to set
*/
public void setRetryDelay(long retryDelay) {
this.retryDelay = retryDelay;
}
/**
* @return the retryTimer
*/
public Timer getRetryTimer() {
return retryTimer;
}
/**
* @param retryTimer
* the retryTimer to set
*/
public void setRetryTimer(Timer retryTimer) {
this.retryTimer = retryTimer;
}
/**
* Submit an ExecutorTask to an ExecutorService.
*
* Used to defer resubmission of a task after it fails, but scheduling its
* resubmission using a timer.
*/
private class SubmitTaskToExecutor extends TimerTask {
/** The task to resubmit. */
private ExecutorTask<T> task;
/**
* Construct a new SubmitTaskToExecutor instance.
*
* @param task
* the task to resubmit.
*/
public SubmitTaskToExecutor(final ExecutorTask<T> task) {
this.task = task;
}
/**
* Submits the task to the executor.
*/
public void run() {
service.submit(task);
}
}
}