My server has an ExecutorService
for job scheduling. It came up that we need to have some jobs "jump the queue", that is, go to the front of the line waiting for the next free thread. What to do?
A web search came up empty. After hemming and hawing, and polling other teams, I went with this solution (below). Comments and improvements welcome.
public class PriorityExecutor extends ThreadPoolExecutor { public PriorityExecutor() { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator())); } public PriorityExecutor(final ThreadFactory threadFactory) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator()), threadFactory); } public PriorityExecutor(final RejectedExecutionHandler handler) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator()), handler); } public PriorityExecutor(final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator()), threadFactory, handler); } @Override protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) { if (callable instanceof Important) return new PriorityTask<T>(((Important) callable).getPriority(), callable); else return new PriorityTask<T>(0, callable); } @Override protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { if (runnable instanceof Important) return new PriorityTask<T>(((Important) runnable).getPriority(), runnable, value); else return new PriorityTask<T>(0, runnable, value); } public interface Important { int getPriority(); } private static final class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> { private final int priority; public PriorityTask(final int priority, final Callable<T> tCallable) { super(tCallable); this.priority = priority; } public PriorityTask(final int priority, final Runnable runnable, final T result) { super(runnable, result); this.priority = priority; } @Override public int compareTo(final PriorityTask<T> o) { final long diff = o.priority - priority; return 0 == diff ? 0 : 0 > diff ? -1 : 1; } } private static class PriorityTaskComparator implements Comparator<Runnable> { @Override public int compare(final Runnable left, final Runnable right) { return ((PriorityTask) left).compareTo((PriorityTask) right); } } }
Some points:
- In JDK5 you will need to add three of:
@Override public Future<?> submit(final Runnable task) { if (task == null) throw new NullPointerException(); final RunnableFuture<Object> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
These extension points were added in JDK6. - It might be useful to include a
ThreadFactory
which gives important threads higher scheduling priority. It should reuse a factory from the constructor. - Remember to take advantage of
NORM_PRIORITY
and his cohorts.
Callers create their own Runnable
and Callable
classes which implement Important
to communicate their priority. Then simply submit instances of these as usual. Jobs with the highest priority jump to the front of the work queue in the executor waiting for the next free thread.