Wednesday, April 22, 2009

Jumping the work queue in Executor

My server has an ExecutorService for job scheduling. It came up that we need to have some jobs "jump the queue", that is, go to the front of the line waiting for the next free thread. What to do?

A web search came up empty. After hemming and hawing, and polling other teams, I went with this solution (below). Comments and improvements welcome.

public class PriorityExecutor
        extends ThreadPoolExecutor {
    public PriorityExecutor() {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()));
    }

    public PriorityExecutor(final ThreadFactory threadFactory) {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()), threadFactory);
    }

    public PriorityExecutor(final RejectedExecutionHandler handler) {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()), handler);
    }

    public PriorityExecutor(final ThreadFactory threadFactory,
            final RejectedExecutionHandler handler) {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()), threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
        if (callable instanceof Important)
            return new PriorityTask<T>(((Important) callable).getPriority(),
                    callable);
        else
            return new PriorityTask<T>(0, callable);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable,
            final T value) {
        if (runnable instanceof Important)
            return new PriorityTask<T>(((Important) runnable).getPriority(),
                    runnable, value);
        else
            return new PriorityTask<T>(0, runnable, value);
    }

    public interface Important {
        int getPriority();
    }

    private static final class PriorityTask<T>
            extends FutureTask<T>
            implements Comparable<PriorityTask<T>> {
        private final int priority;

        public PriorityTask(final int priority, final Callable<T> tCallable) {
            super(tCallable);

            this.priority = priority;
        }

        public PriorityTask(final int priority, final Runnable runnable,
                final T result) {
            super(runnable, result);

            this.priority = priority;
        }

        @Override
        public int compareTo(final PriorityTask<T> o) {
            final long diff = o.priority - priority;
            return 0 == diff ? 0 : 0 > diff ? -1 : 1;
        }
    }

    private static class PriorityTaskComparator
            implements Comparator<Runnable> {
        @Override
        public int compare(final Runnable left, final Runnable right) {
            return ((PriorityTask) left).compareTo((PriorityTask) right);
        }
    }
}

Some points:

  • In JDK5 you will need to add three of:
        @Override
        public Future<?> submit(final Runnable task) {
            if (task == null)
                throw new NullPointerException();
            final RunnableFuture<Object> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
    These extension points were added in JDK6.
  • It might be useful to include a ThreadFactory which gives important threads higher scheduling priority. It should reuse a factory from the constructor.
  • Remember to take advantage of NORM_PRIORITY and his cohorts.

Callers create their own Runnable and Callable classes which implement Important to communicate their priority. Then simply submit instances of these as usual. Jobs with the highest priority jump to the front of the work queue in the executor waiting for the next free thread.

Monday, April 20, 2009

Fluent assertions, bonus questions

Alex Ruiz tipped me off to an update to FEST, a fluent assertion library. Thanks, Alex!

Bonus questions. Everyone know this idiom:

<K, V> V get(final K key) {
    final V value = concurrentMap.get(key);
    if (null != value) return value;
    concurrentMap.putIfAbsent(key, factoryMethodForV());
    return concurrentMap.get(key);
}

Easier question: How do you support valid null values for the concurrent map? (Hint: monads)

Harder question: Suppose factoryMethodForV() is expensive. How do you guarantee it is only ever called once for a given key?

UPDATE: Fixed some typos.