diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java | 54 |
1 files changed, 31 insertions, 23 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index d0d2906dee..f55c84e943 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -47,8 +47,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private AtomicLong insertionOrder = new AtomicLong(); private Queue<Runnable> current = ConcurrentCollections.newQueue(); - PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory); + PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder); } public Pending[] getPending() { @@ -88,10 +88,14 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { for (Runnable runnable : runnables) { if (runnable instanceof TieBreakingPrioritizedRunnable) { TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable; - pending.add(new Pending(t.runnable, t.priority(), t.insertionOrder, executing)); + pending.add(new Pending(unwrap(t.runnable), t.priority(), t.insertionOrder, executing)); } else if (runnable instanceof PrioritizedFutureTask) { PrioritizedFutureTask t = (PrioritizedFutureTask) runnable; - pending.add(new Pending(t.task, t.priority, t.insertionOrder, executing)); + Object task = t.task; + if (t.task instanceof Runnable) { + task = unwrap((Runnable) t.task); + } + pending.add(new Pending(task, t.priority, t.insertionOrder, executing)); } } } @@ -107,12 +111,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) { - if (command instanceof PrioritizedRunnable) { - command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet()); - } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper... - command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet()); - } - super.execute(command); + command = wrapRunnable(command); + doExecute(command); if (timeout.nanos() >= 0) { if (command instanceof TieBreakingPrioritizedRunnable) { ((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout); @@ -125,21 +125,31 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } @Override - public void execute(Runnable command) { + protected Runnable wrapRunnable(Runnable command) { if (command instanceof PrioritizedRunnable) { - command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet()); - } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper... - command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet()); + if ((command instanceof TieBreakingPrioritizedRunnable)) { + return command; + } + Priority priority = ((PrioritizedRunnable) command).priority(); + return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet()); + } else if (command instanceof PrioritizedFutureTask) { + return command; + } else { // it might be a callable wrapper... + if (command instanceof TieBreakingPrioritizedRunnable) { + return command; + } + return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet()); } - super.execute(command); } + @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { if (!(runnable instanceof PrioritizedRunnable)) { runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL); } - return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet()); + Priority priority = ((PrioritizedRunnable) runnable).priority(); + return new PrioritizedFutureTask<>(runnable, priority, value, insertionOrder.incrementAndGet()); } @Override @@ -147,7 +157,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { if (!(callable instanceof PrioritizedCallable)) { callable = PrioritizedCallable.wrap(callable, Priority.NORMAL); } - return new PrioritizedFutureTask<>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet()); + return new PrioritizedFutureTask<>((PrioritizedCallable)callable, insertionOrder.incrementAndGet()); } public static class Pending { @@ -173,10 +183,6 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private ScheduledFuture<?> timeoutFuture; private boolean started = false; - TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) { - this(runnable, runnable.priority(), insertionOrder); - } - TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long insertionOrder) { super(priority); this.runnable = runnable; @@ -233,6 +239,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { runnable = null; timeoutFuture = null; } + } } @@ -242,10 +249,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { final Priority priority; final long insertionOrder; - public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long insertionOrder) { + public PrioritizedFutureTask(Runnable runnable, Priority priority, T value, long insertionOrder) { super(runnable, value); this.task = runnable; - this.priority = runnable.priority(); + this.priority = priority; this.insertionOrder = insertionOrder; } @@ -265,4 +272,5 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { return insertionOrder < pft.insertionOrder ? -1 : 1; } } + } |