RoundRobinBlockingQueue.java

  1. package gov.usgs.earthquake.util;

  2. import java.util.Collection;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.locks.Condition;
  6. import java.util.concurrent.locks.ReentrantLock;

  7. /**
  8.  * Round Robin Blocking Queue.
  9.  *
  10.  * {@link #put(Object)} and {@link #take()} are recommended, as other methods
  11.  * internally call these methods.
  12.  *
  13.  * @param <T> queue item type.
  14.  */
  15. public class RoundRobinBlockingQueue<T> extends RoundRobinQueue<T> implements
  16.         BlockingQueue<T> {

  17.     private final ReentrantLock changeLock;
  18.     private final Condition notEmptyCondition;

  19.     /** Constructor */
  20.     public RoundRobinBlockingQueue() {
  21.         changeLock = new ReentrantLock();
  22.         notEmptyCondition = changeLock.newCondition();
  23.     }

  24.     /**
  25.      * Add an item to the queue.
  26.      */
  27.     @Override
  28.     public boolean add(T e) {
  29.         try {
  30.             changeLock.lockInterruptibly();
  31.         } catch (InterruptedException ie) {
  32.             throw new RuntimeException(ie);
  33.         }
  34.         try {
  35.             super.add(e);
  36.             notEmptyCondition.signal();
  37.         } finally {
  38.             changeLock.unlock();
  39.         }
  40.         return true;
  41.     }

  42.     /**
  43.      * Check if queue contains an item.
  44.      */
  45.     @Override
  46.     public boolean contains(Object e) {
  47.         try {
  48.             changeLock.lockInterruptibly();
  49.         } catch (InterruptedException ie) {
  50.             throw new RuntimeException(ie);
  51.         }
  52.         try {
  53.             return super.contains(e);
  54.         } finally {
  55.             changeLock.unlock();
  56.         }
  57.     }

  58.     /**
  59.      * Offer an item to the queue.
  60.      *
  61.      * Calls {@link #add(Object)}, but returns false if any exceptions thrown.
  62.      */
  63.     @Override
  64.     public boolean offer(T e) {
  65.         try {
  66.             return add(e);
  67.         } catch (Exception ex) {
  68.             return false;
  69.         }
  70.     }

  71.     /**
  72.      * Offer an item to the queue.
  73.      *
  74.      * Same as {@link #offer(Object)}, this is an unbounded queue.
  75.      */
  76.     @Override
  77.     public boolean offer(T e, long timeout, TimeUnit unit)
  78.             throws InterruptedException {
  79.         changeLock.tryLock(timeout, unit);
  80.         try {
  81.             super.add(e);
  82.             notEmptyCondition.signal();
  83.         } finally {
  84.             changeLock.unlock();
  85.         }
  86.         return true;
  87.     }

  88.     /**
  89.      * Retrieves and removes the head of this queue, waiting up to the specified
  90.      * wait time if necessary for an element to become available.
  91.      */
  92.     @Override
  93.     public T poll(long timeout, TimeUnit unit) throws InterruptedException {
  94.         changeLock.lockInterruptibly();
  95.         try {
  96.             if (isEmpty()) {
  97.                 notEmptyCondition.await(timeout, unit);
  98.             }
  99.             try {
  100.                 return remove();
  101.             } catch (Exception e) {
  102.                 return null;
  103.             }
  104.         } finally {
  105.             changeLock.unlock();
  106.         }
  107.     }

  108.     /**
  109.      * Put an item in the queue.
  110.      *
  111.      * @throws InterruptedException
  112.      *             if interrupted while acquiring lock.
  113.      */
  114.     @Override
  115.     public void put(T e) throws InterruptedException {
  116.         changeLock.lockInterruptibly();
  117.         try {
  118.             super.add(e);
  119.             notEmptyCondition.signal();
  120.         } catch (RuntimeException re) {
  121.             // may be thrown by add if interrupted
  122.             if (re.getCause() instanceof InterruptedException) {
  123.                 throw (InterruptedException) re.getCause();
  124.             }
  125.         } finally {
  126.             changeLock.unlock();
  127.         }
  128.     }

  129.     /**
  130.      * Unbounded queues return Integer.MAX_VALUE.
  131.      *
  132.      * @return Integer.MAX_VALUE;
  133.      */
  134.     @Override
  135.     public int remainingCapacity() {
  136.         return Integer.MAX_VALUE;
  137.     }

  138.     /**
  139.      * Remove an object from the queue.
  140.      */
  141.     @Override
  142.     public boolean remove(Object e) {
  143.         try {
  144.             changeLock.lockInterruptibly();
  145.         } catch (InterruptedException ie) {
  146.             throw new RuntimeException(ie);
  147.         }
  148.         try {
  149.             return super.remove(e);
  150.         } finally {
  151.             changeLock.unlock();
  152.         }
  153.     }

  154.     /**
  155.      * Remove an object from the queue.
  156.      */
  157.     @Override
  158.     public T take() throws InterruptedException {
  159.         changeLock.lockInterruptibly();
  160.         try {
  161.             while (isEmpty()) {
  162.                 notEmptyCondition.await();
  163.             }
  164.             return super.remove();
  165.         } finally {
  166.             changeLock.unlock();
  167.         }
  168.     }

  169.     /**
  170.      * Empty queue into a collection.
  171.      */
  172.     @Override
  173.     public int drainTo(Collection<? super T> c) {
  174.         return drainTo(c, -1);
  175.     }

  176.     /**
  177.      * Empty queue into a collection, stopping after max elements.
  178.      */
  179.     @Override
  180.     public int drainTo(Collection<? super T> c, int max) {
  181.         try {
  182.             changeLock.lockInterruptibly();
  183.         } catch (InterruptedException e) {
  184.             // none drained
  185.             return 0;
  186.         }
  187.         try {
  188.             int count = 0;
  189.             while (!isEmpty() && (max < 0 || count < max)) {
  190.                 c.add(remove());
  191.                 count++;
  192.             }
  193.             return count;
  194.         } finally {
  195.             changeLock.unlock();
  196.         }
  197.     }

  198. }