Thursday, January 18, 2007

Controlling a blocking queue in Java

Recently I needed a blocking queue in Java in which I could pause the reader side. Normal behavior is for queue.take() to block so that a reader only runs when there are elements in the queue to process. What I wanted was for there to also be a throttle, so I could pause reading even when there were elements to process. It is not difficult, in fact, to make such a queue in which either end might be paused or stopped, and resumed or restarted:

public class LinkedControllableBlockingQueue<E>
        extends LinkedBlockingQueue<E>
        implements ControllableBlockingQueue<E> {
    // Read side
    private final Lock readLock = new ReentrantLock();
    private final Condition resumed = readLock.newCondition();
    private final AtomicReference<Thread> pausingThread
            = new AtomicReference<Thread>();

    // Write side
    private final Lock writeLock = new ReentrantLock();
    private final Condition restarted = writeLock.newCondition();
    // Begin started
    private final AtomicReference<Thread> startingThread
            = new AtomicReference<Thread>(currentThread());

    // Controllable

    public boolean isStarted() {
        return null != startingThread.get();
    }

    public boolean isPaused() {
        return null != pausingThread.get();
    }

    public void start() {
        if (!startingThread.compareAndSet(null, currentThread()))
            throw new IllegalStateException("Already started");

        writeLock.lock();
        try {
            restarted.signalAll();
        } finally {
            writeLock.unlock();
        }
    }

    public void pause() {
        if (!pausingThread.compareAndSet(null, currentThread()))
            throw new IllegalStateException("Already paused");
    }

    public void resume() {
        if (!pausingThread.compareAndSet(currentThread(), null))
            throw new IllegalStateException("Not paused by current thread");

        readLock.lock();
        try {
            resumed.signalAll();
        } finally {
            readLock.unlock();
        }
    }

    public void stop() {
        if (!startingThread.compareAndSet(currentThread(), null))
            throw new IllegalStateException("Not started by current thread");
    }

    // LinkedBlockingQueue

    // Read side

    @Override
    public E take()
            throws InterruptedException {
        readLock.lock();
        try {
            while (isPaused())
                resumed.await();
        } finally {
            readLock.unlock();
        }

        return super.take();
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Time spent <em>paused</em> is subtracted from <var>timeout</var>.  That
     * is, the total time spent in {@code poll} is the same for the pausing and
     * non-pausing versions of {@code LinkedBlockingQueue} for a given
     * <var>timeout</var> and <var>unit</var>.
     */
    @Override
    public E poll(final long timeout, final TimeUnit unit)
            throws InterruptedException {
        long nanos = unit.toNanos(timeout);

        readLock.lock();
        try {
            while (isPaused())
                if (0 >= (nanos = resumed.awaitNanos(nanos)))
                    return null;
        } finally {
            readLock.unlock();
        }

        return super.poll(nanos, NANOSECONDS);
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Returns {@code null} if paused.
     */
    @Override
    public E poll() {
        readLock.lock();
        try {
            if (isPaused())
                return null;
        } finally {
            readLock.unlock();
        }

        return super.poll();
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Returns {@code null} if interrupted while paused.
     */
    @Override
    public E peek() {
        readLock.lock();
        try {
            while (isPaused())
                resumed.await();
        } catch (final InterruptedException e) {
            return null;
        } finally {
            readLock.unlock();
        }

        return super.peek();
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Returns {@code false} if interrupted while paused.
     */
    @Override
    public boolean remove(final Object o) {
        readLock.lock();
        try {
            while (isPaused())
                resumed.await();
        } catch (final InterruptedException e) {
            return false;
        } finally {
            readLock.unlock();
        }

        return super.remove(o);
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Also throws {@code NoSuchElementException} if interrupted while paused.
     */
    @Override
    public E remove() {
        readLock.lock();
        try {
            while (isPaused())
                resumed.await();
        } catch (final InterruptedException e) {
            throw new NoSuchElementException(e.getMessage());
        } finally {
            readLock.unlock();
        }

        return super.remove();
    }

    // Write side

    @Override
    public void put(final E o)
            throws InterruptedException {
        writeLock.lock();
        try {
            while (!isStarted())
                restarted.await();
        } finally {
            writeLock.unlock();
        }

        super.put(o);
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Time spent <em>stopped</em> is subtracted from <var>timeout</var>.  That
     * is, the total time spent in {@code offer} is the same for the stopping
     * and non-stopping versions of {@code LinkedBlockingQueue} for a given
     * <var>timeout</var> and <var>unit</var>.
     */
    @Override
    public boolean offer(final E o, final long timeout, final TimeUnit unit)
            throws InterruptedException {
        long nanos = unit.toNanos(timeout);

        writeLock.lock();
        try {
            while (!isStarted())
                if (0 >= (nanos = restarted.awaitNanos(nanos)))
                    return false;
        } finally {
            writeLock.unlock();
        }

        return super.offer(o, nanos, NANOSECONDS);
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Also returns {@code false} if interrupted while stopped.
     */
    @Override
    public boolean offer(final E o) {
        writeLock.lock();
        try {
            while (!isStarted())
                restarted.await();
        } catch (final InterruptedException e) {
            return false;
        } finally {
            writeLock.unlock();
        }

        return super.offer(o);
    }

    /**
     * {@inheritDoc}
     * <p/>
     * Also returns {@code false} if stopped.
     */
    @Override
    public boolean add(final E o) {
        return isStarted() && super.add(o);
    }
}