My server has an ExecutorService for job scheduling. It came up that we need to have some jobs "jump the queue", that is, go to the front of the line waiting for the next free thread. What to do?
A web search came up empty. After hemming and hawing, and polling other teams, I went with this solution (below). Comments and improvements welcome.
public class PriorityExecutor
extends ThreadPoolExecutor {
public PriorityExecutor() {
super(0, Integer.MAX_VALUE, 60L, SECONDS,
new PriorityBlockingQueue<Runnable>(11,
new PriorityTaskComparator()));
}
public PriorityExecutor(final ThreadFactory threadFactory) {
super(0, Integer.MAX_VALUE, 60L, SECONDS,
new PriorityBlockingQueue<Runnable>(11,
new PriorityTaskComparator()), threadFactory);
}
public PriorityExecutor(final RejectedExecutionHandler handler) {
super(0, Integer.MAX_VALUE, 60L, SECONDS,
new PriorityBlockingQueue<Runnable>(11,
new PriorityTaskComparator()), handler);
}
public PriorityExecutor(final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(0, Integer.MAX_VALUE, 60L, SECONDS,
new PriorityBlockingQueue<Runnable>(11,
new PriorityTaskComparator()), threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
if (callable instanceof Important)
return new PriorityTask<T>(((Important) callable).getPriority(),
callable);
else
return new PriorityTask<T>(0, callable);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable,
final T value) {
if (runnable instanceof Important)
return new PriorityTask<T>(((Important) runnable).getPriority(),
runnable, value);
else
return new PriorityTask<T>(0, runnable, value);
}
public interface Important {
int getPriority();
}
private static final class PriorityTask<T>
extends FutureTask<T>
implements Comparable<PriorityTask<T>> {
private final int priority;
public PriorityTask(final int priority, final Callable<T> tCallable) {
super(tCallable);
this.priority = priority;
}
public PriorityTask(final int priority, final Runnable runnable,
final T result) {
super(runnable, result);
this.priority = priority;
}
@Override
public int compareTo(final PriorityTask<T> o) {
final long diff = o.priority - priority;
return 0 == diff ? 0 : 0 > diff ? -1 : 1;
}
}
private static class PriorityTaskComparator
implements Comparator<Runnable> {
@Override
public int compare(final Runnable left, final Runnable right) {
return ((PriorityTask) left).compareTo((PriorityTask) right);
}
}
} Some points:
- In JDK5 you will need to add three of:
@Override public Future<?> submit(final Runnable task) { if (task == null) throw new NullPointerException(); final RunnableFuture<Object> ftask = newTaskFor(task, null); execute(ftask); return ftask; }These extension points were added in JDK6. - It might be useful to include a
ThreadFactorywhich gives important threads higher scheduling priority. It should reuse a factory from the constructor. - Remember to take advantage of
NORM_PRIORITYand his cohorts.
Callers create their own Runnable and Callable classes which implement Important to communicate their priority. Then simply submit instances of these as usual. Jobs with the highest priority jump to the front of the work queue in the executor waiting for the next free thread.