Tuesday, June 23, 2009

Dataflow concurrency for Java

I ran across this interesting Scala class by Vaclav Pech which makes the data concurrent rather than the code (if I understood the author correctly). The ~ (extraction) and << (insertion) operators looked nifty.

Looking to do the same in Java, I see four key requirements:

  1. Insertion can happen only once.
  2. Extraction is idempotent (infinitely repeatable, non-modifying).
  3. Extraction blocks until insertion has completed.
  4. Insertion and extraction are both atomic.

With these in mind, I write this:

public class DataFlowReference<T> {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private volatile T item;
    private volatile boolean set = false;
    private volatile boolean called = false;

    public T get()
            throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (true) {
                if (set)
                    return item;

                if (!called) {
                    called = true;
                    onFirstGet();
                    continue;
                }

                condition.await();
            }
        } finally {
            lock.unlock();
        }
    }

    public void setOnce(final T value) {
        lock.lock();
        try {
            if (set)
                return;

            set = true;
            item = value;

            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    protected void onFirstGet() {
    }

    // Object

    @Override
    public boolean equals(final Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;

        final DataFlowReference that = (DataFlowReference) o;

        return set && item.equals(that.item);
    }

    @Override
    public int hashCode() {
        return set ? item.hashCode() : super.hashCode();
    }

    @Override
    public String toString() {
        return "(" + set + ':' + item + ')';
    }
}

Use of DataFlowReference follows the examples from Vaclav's page: declare dataflow references, create threads whose runables use setOnce() and get(), invoke them all together.

onFirstGet() supports "just in time" supply of item and would call setOnce(T) with a fresh value.

UPDATE: The original version of this class had a horrible, obvious race condition. Caveat plicator.

1 comment:

jed said...

You have basically implemented a Future with external setting semantics. For something similar, see:

http://labs.atlassian.com/source/browse/CONCURRENT/trunk/src/main/java/com/atlassian/util/concurrent/SettableFuture.java?r=2602

and a related conversation (Offering data to Futures):

http://cs.oswego.edu/pipermail/concurrency-interest/2009-May/thread.html#6161