Wednesday, April 22, 2009

Jumping the work queue in Executor

My server has an ExecutorService for job scheduling. It came up that we need to have some jobs "jump the queue", that is, go to the front of the line waiting for the next free thread. What to do?

A web search came up empty. After hemming and hawing, and polling other teams, I went with this solution (below). Comments and improvements welcome.

public class PriorityExecutor
        extends ThreadPoolExecutor {
    public PriorityExecutor() {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()));
    }

    public PriorityExecutor(final ThreadFactory threadFactory) {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()), threadFactory);
    }

    public PriorityExecutor(final RejectedExecutionHandler handler) {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()), handler);
    }

    public PriorityExecutor(final ThreadFactory threadFactory,
            final RejectedExecutionHandler handler) {
        super(0, Integer.MAX_VALUE, 60L, SECONDS,
                new PriorityBlockingQueue<Runnable>(11,
                        new PriorityTaskComparator()), threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
        if (callable instanceof Important)
            return new PriorityTask<T>(((Important) callable).getPriority(),
                    callable);
        else
            return new PriorityTask<T>(0, callable);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable,
            final T value) {
        if (runnable instanceof Important)
            return new PriorityTask<T>(((Important) runnable).getPriority(),
                    runnable, value);
        else
            return new PriorityTask<T>(0, runnable, value);
    }

    public interface Important {
        int getPriority();
    }

    private static final class PriorityTask<T>
            extends FutureTask<T>
            implements Comparable<PriorityTask<T>> {
        private final int priority;

        public PriorityTask(final int priority, final Callable<T> tCallable) {
            super(tCallable);

            this.priority = priority;
        }

        public PriorityTask(final int priority, final Runnable runnable,
                final T result) {
            super(runnable, result);

            this.priority = priority;
        }

        @Override
        public int compareTo(final PriorityTask<T> o) {
            final long diff = o.priority - priority;
            return 0 == diff ? 0 : 0 > diff ? -1 : 1;
        }
    }

    private static class PriorityTaskComparator
            implements Comparator<Runnable> {
        @Override
        public int compare(final Runnable left, final Runnable right) {
            return ((PriorityTask) left).compareTo((PriorityTask) right);
        }
    }
}

Some points:

  • In JDK5 you will need to add three of:
        @Override
        public Future<?> submit(final Runnable task) {
            if (task == null)
                throw new NullPointerException();
            final RunnableFuture<Object> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
    These extension points were added in JDK6.
  • It might be useful to include a ThreadFactory which gives important threads higher scheduling priority. It should reuse a factory from the constructor.
  • Remember to take advantage of NORM_PRIORITY and his cohorts.

Callers create their own Runnable and Callable classes which implement Important to communicate their priority. Then simply submit instances of these as usual. Jobs with the highest priority jump to the front of the work queue in the executor waiting for the next free thread.

7 comments:

Anonymous said...

Starvation?

Anonymous said...

An improvement:
  public int compareTo(final PriorityTask<T> o) {
    final long diff = o.priority - priority;
    return Long.signum(diff);
  }

And I'm probably missing something, but couldn't you get rid of most of the code (i.e., the PriorityTask class and newTaskFor overrides) by making your Comparator smarter? That is, something like this:
  private static class PriorityTaskComparator
    implements Comparator<Runnable> {
    @Override
    public int compare(final Runnable left, final Runnable right) {
      int leftPriority = left instanceof Important ? ((Important) left).getPriority() : 0;
      int rightPriority = right instanceof Important ? ((Important) right).getPriority() : 0;
      long diff = left - right;
      return Long.signum(diff);
    }
  }

Also, ditto the "starvation" comment of the previous commenter; see the PriorityQueue javadocs.

Anonymous said...

Can You give an example of how to use it.

Anonymous said...

Can any one tell me , how to use it like submitting the tasks with the priority like exec.submit(,priority);

Brian Oxley said...

Starvation is certainly possible once you introduce priorities. As a practical matter it was a non-issue for me since the "cut in line" tasks came up rarely.

Another option we could have considered was a separate, idle executor for the high priority tasks. I also considered adjusting thread priorities in the thread factory for the executor of the high-priority queue, but in the past got burned using JDK thread priorities. (Perhaps things are better now. My experience was from JDK1.4.)

Unknown said...

Brilliant solution, works like a charm.

Unknown said...

Brilliant solution, work like a charm. Thank you.