My server has an ExecutorService
for job scheduling. It came up that we need to have some jobs "jump the queue", that is, go to the front of the line waiting for the next free thread. What to do?
A web search came up empty. After hemming and hawing, and polling other teams, I went with this solution (below). Comments and improvements welcome.
public class PriorityExecutor extends ThreadPoolExecutor { public PriorityExecutor() { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator())); } public PriorityExecutor(final ThreadFactory threadFactory) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator()), threadFactory); } public PriorityExecutor(final RejectedExecutionHandler handler) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator()), handler); } public PriorityExecutor(final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, new PriorityTaskComparator()), threadFactory, handler); } @Override protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) { if (callable instanceof Important) return new PriorityTask<T>(((Important) callable).getPriority(), callable); else return new PriorityTask<T>(0, callable); } @Override protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { if (runnable instanceof Important) return new PriorityTask<T>(((Important) runnable).getPriority(), runnable, value); else return new PriorityTask<T>(0, runnable, value); } public interface Important { int getPriority(); } private static final class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> { private final int priority; public PriorityTask(final int priority, final Callable<T> tCallable) { super(tCallable); this.priority = priority; } public PriorityTask(final int priority, final Runnable runnable, final T result) { super(runnable, result); this.priority = priority; } @Override public int compareTo(final PriorityTask<T> o) { final long diff = o.priority - priority; return 0 == diff ? 0 : 0 > diff ? -1 : 1; } } private static class PriorityTaskComparator implements Comparator<Runnable> { @Override public int compare(final Runnable left, final Runnable right) { return ((PriorityTask) left).compareTo((PriorityTask) right); } } }
Some points:
- In JDK5 you will need to add three of:
@Override public Future<?> submit(final Runnable task) { if (task == null) throw new NullPointerException(); final RunnableFuture<Object> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
These extension points were added in JDK6. - It might be useful to include a
ThreadFactory
which gives important threads higher scheduling priority. It should reuse a factory from the constructor. - Remember to take advantage of
NORM_PRIORITY
and his cohorts.
Callers create their own Runnable
and Callable
classes which implement Important
to communicate their priority. Then simply submit instances of these as usual. Jobs with the highest priority jump to the front of the work queue in the executor waiting for the next free thread.
7 comments:
Starvation?
An improvement:
public int compareTo(final PriorityTask<T> o) {
final long diff = o.priority - priority;
return Long.signum(diff);
}
And I'm probably missing something, but couldn't you get rid of most of the code (i.e., the PriorityTask class and newTaskFor overrides) by making your Comparator smarter? That is, something like this:
private static class PriorityTaskComparator
implements Comparator<Runnable> {
@Override
public int compare(final Runnable left, final Runnable right) {
int leftPriority = left instanceof Important ? ((Important) left).getPriority() : 0;
int rightPriority = right instanceof Important ? ((Important) right).getPriority() : 0;
long diff = left - right;
return Long.signum(diff);
}
}
Also, ditto the "starvation" comment of the previous commenter; see the PriorityQueue javadocs.
Can You give an example of how to use it.
Can any one tell me , how to use it like submitting the tasks with the priority like exec.submit(,priority);
Starvation is certainly possible once you introduce priorities. As a practical matter it was a non-issue for me since the "cut in line" tasks came up rarely.
Another option we could have considered was a separate, idle executor for the high priority tasks. I also considered adjusting thread priorities in the thread factory for the executor of the high-priority queue, but in the past got burned using JDK thread priorities. (Perhaps things are better now. My experience was from JDK1.4.)
Brilliant solution, works like a charm.
Brilliant solution, work like a charm. Thank you.
Post a Comment