RoundRobinBlockingQueue.java

package gov.usgs.earthquake.util;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Round Robin Blocking Queue.
 *
 * {@link #put(Object)} and {@link #take()} are recommended, as other methods
 * internally call these methods.
 *
 * @param <T> queue item type.
 */
public class RoundRobinBlockingQueue<T> extends RoundRobinQueue<T> implements
		BlockingQueue<T> {

	private final ReentrantLock changeLock;
	private final Condition notEmptyCondition;

	/** Constructor */
	public RoundRobinBlockingQueue() {
		changeLock = new ReentrantLock();
		notEmptyCondition = changeLock.newCondition();
	}

	/**
	 * Add an item to the queue.
	 */
	@Override
	public boolean add(T e) {
		try {
			changeLock.lockInterruptibly();
		} catch (InterruptedException ie) {
			throw new RuntimeException(ie);
		}
		try {
			super.add(e);
			notEmptyCondition.signal();
		} finally {
			changeLock.unlock();
		}
		return true;
	}

	/**
	 * Check if queue contains an item.
	 */
	@Override
	public boolean contains(Object e) {
		try {
			changeLock.lockInterruptibly();
		} catch (InterruptedException ie) {
			throw new RuntimeException(ie);
		}
		try {
			return super.contains(e);
		} finally {
			changeLock.unlock();
		}
	}

	/**
	 * Offer an item to the queue.
	 *
	 * Calls {@link #add(Object)}, but returns false if any exceptions thrown.
	 */
	@Override
	public boolean offer(T e) {
		try {
			return add(e);
		} catch (Exception ex) {
			return false;
		}
	}

	/**
	 * Offer an item to the queue.
	 *
	 * Same as {@link #offer(Object)}, this is an unbounded queue.
	 */
	@Override
	public boolean offer(T e, long timeout, TimeUnit unit)
			throws InterruptedException {
		changeLock.tryLock(timeout, unit);
		try {
			super.add(e);
			notEmptyCondition.signal();
		} finally {
			changeLock.unlock();
		}
		return true;
	}

	/**
	 * Retrieves and removes the head of this queue, waiting up to the specified
	 * wait time if necessary for an element to become available.
	 */
	@Override
	public T poll(long timeout, TimeUnit unit) throws InterruptedException {
		changeLock.lockInterruptibly();
		try {
			if (isEmpty()) {
				notEmptyCondition.await(timeout, unit);
			}
			try {
				return remove();
			} catch (Exception e) {
				return null;
			}
		} finally {
			changeLock.unlock();
		}
	}

	/**
	 * Put an item in the queue.
	 *
	 * @throws InterruptedException
	 *             if interrupted while acquiring lock.
	 */
	@Override
	public void put(T e) throws InterruptedException {
		changeLock.lockInterruptibly();
		try {
			super.add(e);
			notEmptyCondition.signal();
		} catch (RuntimeException re) {
			// may be thrown by add if interrupted
			if (re.getCause() instanceof InterruptedException) {
				throw (InterruptedException) re.getCause();
			}
		} finally {
			changeLock.unlock();
		}
	}

	/**
	 * Unbounded queues return Integer.MAX_VALUE.
	 *
	 * @return Integer.MAX_VALUE;
	 */
	@Override
	public int remainingCapacity() {
		return Integer.MAX_VALUE;
	}

	/**
	 * Remove an object from the queue.
	 */
	@Override
	public boolean remove(Object e) {
		try {
			changeLock.lockInterruptibly();
		} catch (InterruptedException ie) {
			throw new RuntimeException(ie);
		}
		try {
			return super.remove(e);
		} finally {
			changeLock.unlock();
		}
	}

	/**
	 * Remove an object from the queue.
	 */
	@Override
	public T take() throws InterruptedException {
		changeLock.lockInterruptibly();
		try {
			while (isEmpty()) {
				notEmptyCondition.await();
			}
			return super.remove();
		} finally {
			changeLock.unlock();
		}
	}

	/**
	 * Empty queue into a collection.
	 */
	@Override
	public int drainTo(Collection<? super T> c) {
		return drainTo(c, -1);
	}

	/**
	 * Empty queue into a collection, stopping after max elements.
	 */
	@Override
	public int drainTo(Collection<? super T> c, int max) {
		try {
			changeLock.lockInterruptibly();
		} catch (InterruptedException e) {
			// none drained
			return 0;
		}
		try {
			int count = 0;
			while (!isEmpty() && (max < 0 || count < max)) {
				c.add(remove());
				count++;
			}
			return count;
		} finally {
			changeLock.unlock();
		}
	}

}