Recently I needed a blocking queue in Java in which I could pause the reader side. Normal behavior is for queue.take()
to block so that a reader only runs when there are elements in the queue to process. What I wanted was for there to also be a throttle, so I could pause reading even when there were elements to process. It is not difficult, in fact, to make such a queue in which either end might be paused or stopped, and resumed or restarted:
public class LinkedControllableBlockingQueue<E> extends LinkedBlockingQueue<E> implements ControllableBlockingQueue<E> { // Read side private final Lock readLock = new ReentrantLock(); private final Condition resumed = readLock.newCondition(); private final AtomicReference<Thread> pausingThread = new AtomicReference<Thread>(); // Write side private final Lock writeLock = new ReentrantLock(); private final Condition restarted = writeLock.newCondition(); // Begin started private final AtomicReference<Thread> startingThread = new AtomicReference<Thread>(currentThread()); // Controllable public boolean isStarted() { return null != startingThread.get(); } public boolean isPaused() { return null != pausingThread.get(); } public void start() { if (!startingThread.compareAndSet(null, currentThread())) throw new IllegalStateException("Already started"); writeLock.lock(); try { restarted.signalAll(); } finally { writeLock.unlock(); } } public void pause() { if (!pausingThread.compareAndSet(null, currentThread())) throw new IllegalStateException("Already paused"); } public void resume() { if (!pausingThread.compareAndSet(currentThread(), null)) throw new IllegalStateException("Not paused by current thread"); readLock.lock(); try { resumed.signalAll(); } finally { readLock.unlock(); } } public void stop() { if (!startingThread.compareAndSet(currentThread(), null)) throw new IllegalStateException("Not started by current thread"); } // LinkedBlockingQueue // Read side @Override public E take() throws InterruptedException { readLock.lock(); try { while (isPaused()) resumed.await(); } finally { readLock.unlock(); } return super.take(); } /** * {@inheritDoc} * <p/> * Time spent <em>paused</em> is subtracted from <var>timeout</var>. That * is, the total time spent in {@code poll} is the same for the pausing and * non-pausing versions of {@code LinkedBlockingQueue} for a given * <var>timeout</var> and <var>unit</var>. */ @Override public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); readLock.lock(); try { while (isPaused()) if (0 >= (nanos = resumed.awaitNanos(nanos))) return null; } finally { readLock.unlock(); } return super.poll(nanos, NANOSECONDS); } /** * {@inheritDoc} * <p/> * Returns {@code null} if paused. */ @Override public E poll() { readLock.lock(); try { if (isPaused()) return null; } finally { readLock.unlock(); } return super.poll(); } /** * {@inheritDoc} * <p/> * Returns {@code null} if interrupted while paused. */ @Override public E peek() { readLock.lock(); try { while (isPaused()) resumed.await(); } catch (final InterruptedException e) { return null; } finally { readLock.unlock(); } return super.peek(); } /** * {@inheritDoc} * <p/> * Returns {@code false} if interrupted while paused. */ @Override public boolean remove(final Object o) { readLock.lock(); try { while (isPaused()) resumed.await(); } catch (final InterruptedException e) { return false; } finally { readLock.unlock(); } return super.remove(o); } /** * {@inheritDoc} * <p/> * Also throws {@code NoSuchElementException} if interrupted while paused. */ @Override public E remove() { readLock.lock(); try { while (isPaused()) resumed.await(); } catch (final InterruptedException e) { throw new NoSuchElementException(e.getMessage()); } finally { readLock.unlock(); } return super.remove(); } // Write side @Override public void put(final E o) throws InterruptedException { writeLock.lock(); try { while (!isStarted()) restarted.await(); } finally { writeLock.unlock(); } super.put(o); } /** * {@inheritDoc} * <p/> * Time spent <em>stopped</em> is subtracted from <var>timeout</var>. That * is, the total time spent in {@code offer} is the same for the stopping * and non-stopping versions of {@code LinkedBlockingQueue} for a given * <var>timeout</var> and <var>unit</var>. */ @Override public boolean offer(final E o, final long timeout, final TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); writeLock.lock(); try { while (!isStarted()) if (0 >= (nanos = restarted.awaitNanos(nanos))) return false; } finally { writeLock.unlock(); } return super.offer(o, nanos, NANOSECONDS); } /** * {@inheritDoc} * <p/> * Also returns {@code false} if interrupted while stopped. */ @Override public boolean offer(final E o) { writeLock.lock(); try { while (!isStarted()) restarted.await(); } catch (final InterruptedException e) { return false; } finally { writeLock.unlock(); } return super.offer(o); } /** * {@inheritDoc} * <p/> * Also returns {@code false} if stopped. */ @Override public boolean add(final E o) { return isStarted() && super.add(o); } }