Monday, June 20, 2011

New Shimmer for Java Concurrency

At work we use this Java idiom to coordinate start up:

public class LatchExecutorService
        implements ExecutorService {
    private final ExecutorService threadPool
            = newSingleThreadExecutor();
    private final CountDownLatch latch;

    public LatchExecutorService(final int count) {
        latch = new CountDownLatch(count);
        threadPool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    threadPool.shutdown();
                } catch (final InterruptedException e) {
                    threadPool.shutdownNow();
                }
            }
        });
    }

    public void countDown() {
        latch.countDown();
    }

    public int getCount() {
        return latch.getCount();
    }

    public <T> Future<T> submit(final Callable<T> task) {
        try {
            return threadPool.submit(task);
        } catch (final RejectedExecutionException e) {
            task.run();
        }
    }

    public <T> Future<T> submit(final Runnable task,
            final T result) {
        try {
            return threadPool.submit(task, result);
        } catch (final RejectedExecutionException e) {
            task.run();
        }
    }

    public Future<?> submit(final Runnable task) {
        try {
            return threadPool.submit(task);
        } catch (final RejectedExecutionException e) {
            task.run();
        }
    }

    // Delegate remaining executor service methods
    // Cut for brevity
}

This class is like a count down latch but with the extra "runnable" feature of a cyclic barrier. When the latch fully counts down, it executes any registered runnables in order of submission. And it is also an executor service.

Some explanation of the unusual submit semantics. The intent of the class is use at start up, to delay actions that depend on some common initialization but guarantee their execution without coordination of parties.

Say there are four worker threads at start up performing essential work. The rest of the program works in a reduced state until these four complete. Rather than carry four independent signals, I collapsed all into LatchExecutorService.

As tasks are submitted, LatchExecutorService delays their execution until its latch fully counts down. With a blocking task at the head of the single-threaded inner executor service, all other tasks must wait on the latch. When the latch triggers, the blocking task clears and all waiting tasks execute on the inner executor service thread.

Once the latch triggers, I do not need the inner executor service any longer except to clear out waiting tasks so I call shutdown() and free up that thread.

I want any new tasks submitted to run immediately. Rather than have them fail or need to coordinate between slow submitters and fast count downers, I run submitted tasks in their caller's thread on rejection. This guarantees new tasks eventually execute without any coordination, either on the original inner executor service or directly at submission.

You can also view LatchExecutorService as an inside-out cyclic barrier. Rather than one runnable registered at construction to execute when all parties signal, callers can provide as many runnables as they like at any time.

It's a count down latch; it's a cyclic barrier; it's an executor service. It's New Shimmer.