RoundRobinBlockingQueue.java
- package gov.usgs.earthquake.util;
- import java.util.Collection;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- * Round Robin Blocking Queue.
- *
- * {@link #put(Object)} and {@link #take()} are recommended, as other methods
- * internally call these methods.
- *
- * @param <T> queue item type.
- */
- public class RoundRobinBlockingQueue<T> extends RoundRobinQueue<T> implements
- BlockingQueue<T> {
- private final ReentrantLock changeLock;
- private final Condition notEmptyCondition;
- /** Constructor */
- public RoundRobinBlockingQueue() {
- changeLock = new ReentrantLock();
- notEmptyCondition = changeLock.newCondition();
- }
- /**
- * Add an item to the queue.
- */
- @Override
- public boolean add(T e) {
- try {
- changeLock.lockInterruptibly();
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- try {
- super.add(e);
- notEmptyCondition.signal();
- } finally {
- changeLock.unlock();
- }
- return true;
- }
- /**
- * Check if queue contains an item.
- */
- @Override
- public boolean contains(Object e) {
- try {
- changeLock.lockInterruptibly();
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- try {
- return super.contains(e);
- } finally {
- changeLock.unlock();
- }
- }
- /**
- * Offer an item to the queue.
- *
- * Calls {@link #add(Object)}, but returns false if any exceptions thrown.
- */
- @Override
- public boolean offer(T e) {
- try {
- return add(e);
- } catch (Exception ex) {
- return false;
- }
- }
- /**
- * Offer an item to the queue.
- *
- * Same as {@link #offer(Object)}, this is an unbounded queue.
- */
- @Override
- public boolean offer(T e, long timeout, TimeUnit unit)
- throws InterruptedException {
- changeLock.tryLock(timeout, unit);
- try {
- super.add(e);
- notEmptyCondition.signal();
- } finally {
- changeLock.unlock();
- }
- return true;
- }
- /**
- * Retrieves and removes the head of this queue, waiting up to the specified
- * wait time if necessary for an element to become available.
- */
- @Override
- public T poll(long timeout, TimeUnit unit) throws InterruptedException {
- changeLock.lockInterruptibly();
- try {
- if (isEmpty()) {
- notEmptyCondition.await(timeout, unit);
- }
- try {
- return remove();
- } catch (Exception e) {
- return null;
- }
- } finally {
- changeLock.unlock();
- }
- }
- /**
- * Put an item in the queue.
- *
- * @throws InterruptedException
- * if interrupted while acquiring lock.
- */
- @Override
- public void put(T e) throws InterruptedException {
- changeLock.lockInterruptibly();
- try {
- super.add(e);
- notEmptyCondition.signal();
- } catch (RuntimeException re) {
- // may be thrown by add if interrupted
- if (re.getCause() instanceof InterruptedException) {
- throw (InterruptedException) re.getCause();
- }
- } finally {
- changeLock.unlock();
- }
- }
- /**
- * Unbounded queues return Integer.MAX_VALUE.
- *
- * @return Integer.MAX_VALUE;
- */
- @Override
- public int remainingCapacity() {
- return Integer.MAX_VALUE;
- }
- /**
- * Remove an object from the queue.
- */
- @Override
- public boolean remove(Object e) {
- try {
- changeLock.lockInterruptibly();
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- try {
- return super.remove(e);
- } finally {
- changeLock.unlock();
- }
- }
- /**
- * Remove an object from the queue.
- */
- @Override
- public T take() throws InterruptedException {
- changeLock.lockInterruptibly();
- try {
- while (isEmpty()) {
- notEmptyCondition.await();
- }
- return super.remove();
- } finally {
- changeLock.unlock();
- }
- }
- /**
- * Empty queue into a collection.
- */
- @Override
- public int drainTo(Collection<? super T> c) {
- return drainTo(c, -1);
- }
- /**
- * Empty queue into a collection, stopping after max elements.
- */
- @Override
- public int drainTo(Collection<? super T> c, int max) {
- try {
- changeLock.lockInterruptibly();
- } catch (InterruptedException e) {
- // none drained
- return 0;
- }
- try {
- int count = 0;
- while (!isEmpty() && (max < 0 || count < max)) {
- c.add(remove());
- count++;
- }
- return count;
- } finally {
- changeLock.unlock();
- }
- }
- }