Recently I needed a blocking queue in Java in which I could pause the reader side. Normal behavior is for queue.take() to block so that a reader only runs when there are elements in the queue to process. What I wanted was for there to also be a throttle, so I could pause reading even when there were elements to process. It is not difficult, in fact, to make such a queue in which either end might be paused or stopped, and resumed or restarted:
public class LinkedControllableBlockingQueue<E>
extends LinkedBlockingQueue<E>
implements ControllableBlockingQueue<E> {
// Read side
private final Lock readLock = new ReentrantLock();
private final Condition resumed = readLock.newCondition();
private final AtomicReference<Thread> pausingThread
= new AtomicReference<Thread>();
// Write side
private final Lock writeLock = new ReentrantLock();
private final Condition restarted = writeLock.newCondition();
// Begin started
private final AtomicReference<Thread> startingThread
= new AtomicReference<Thread>(currentThread());
// Controllable
public boolean isStarted() {
return null != startingThread.get();
}
public boolean isPaused() {
return null != pausingThread.get();
}
public void start() {
if (!startingThread.compareAndSet(null, currentThread()))
throw new IllegalStateException("Already started");
writeLock.lock();
try {
restarted.signalAll();
} finally {
writeLock.unlock();
}
}
public void pause() {
if (!pausingThread.compareAndSet(null, currentThread()))
throw new IllegalStateException("Already paused");
}
public void resume() {
if (!pausingThread.compareAndSet(currentThread(), null))
throw new IllegalStateException("Not paused by current thread");
readLock.lock();
try {
resumed.signalAll();
} finally {
readLock.unlock();
}
}
public void stop() {
if (!startingThread.compareAndSet(currentThread(), null))
throw new IllegalStateException("Not started by current thread");
}
// LinkedBlockingQueue
// Read side
@Override
public E take()
throws InterruptedException {
readLock.lock();
try {
while (isPaused())
resumed.await();
} finally {
readLock.unlock();
}
return super.take();
}
/**
* {@inheritDoc}
* <p/>
* Time spent <em>paused</em> is subtracted from <var>timeout</var>. That
* is, the total time spent in {@code poll} is the same for the pausing and
* non-pausing versions of {@code LinkedBlockingQueue} for a given
* <var>timeout</var> and <var>unit</var>.
*/
@Override
public E poll(final long timeout, final TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
readLock.lock();
try {
while (isPaused())
if (0 >= (nanos = resumed.awaitNanos(nanos)))
return null;
} finally {
readLock.unlock();
}
return super.poll(nanos, NANOSECONDS);
}
/**
* {@inheritDoc}
* <p/>
* Returns {@code null} if paused.
*/
@Override
public E poll() {
readLock.lock();
try {
if (isPaused())
return null;
} finally {
readLock.unlock();
}
return super.poll();
}
/**
* {@inheritDoc}
* <p/>
* Returns {@code null} if interrupted while paused.
*/
@Override
public E peek() {
readLock.lock();
try {
while (isPaused())
resumed.await();
} catch (final InterruptedException e) {
return null;
} finally {
readLock.unlock();
}
return super.peek();
}
/**
* {@inheritDoc}
* <p/>
* Returns {@code false} if interrupted while paused.
*/
@Override
public boolean remove(final Object o) {
readLock.lock();
try {
while (isPaused())
resumed.await();
} catch (final InterruptedException e) {
return false;
} finally {
readLock.unlock();
}
return super.remove(o);
}
/**
* {@inheritDoc}
* <p/>
* Also throws {@code NoSuchElementException} if interrupted while paused.
*/
@Override
public E remove() {
readLock.lock();
try {
while (isPaused())
resumed.await();
} catch (final InterruptedException e) {
throw new NoSuchElementException(e.getMessage());
} finally {
readLock.unlock();
}
return super.remove();
}
// Write side
@Override
public void put(final E o)
throws InterruptedException {
writeLock.lock();
try {
while (!isStarted())
restarted.await();
} finally {
writeLock.unlock();
}
super.put(o);
}
/**
* {@inheritDoc}
* <p/>
* Time spent <em>stopped</em> is subtracted from <var>timeout</var>. That
* is, the total time spent in {@code offer} is the same for the stopping
* and non-stopping versions of {@code LinkedBlockingQueue} for a given
* <var>timeout</var> and <var>unit</var>.
*/
@Override
public boolean offer(final E o, final long timeout, final TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
writeLock.lock();
try {
while (!isStarted())
if (0 >= (nanos = restarted.awaitNanos(nanos)))
return false;
} finally {
writeLock.unlock();
}
return super.offer(o, nanos, NANOSECONDS);
}
/**
* {@inheritDoc}
* <p/>
* Also returns {@code false} if interrupted while stopped.
*/
@Override
public boolean offer(final E o) {
writeLock.lock();
try {
while (!isStarted())
restarted.await();
} catch (final InterruptedException e) {
return false;
} finally {
writeLock.unlock();
}
return super.offer(o);
}
/**
* {@inheritDoc}
* <p/>
* Also returns {@code false} if stopped.
*/
@Override
public boolean add(final E o) {
return isStarted() && super.add(o);
}
}
2 comments:
BlockingQueue in Java is I guess one of the most popular new Collection class after ConcurrentHashMap. It made implementing producer consumer problem in Java like cakewalk.r
New? :)
Post a Comment