summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java')
-rw-r--r--core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java54
1 files changed, 31 insertions, 23 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
index d0d2906dee..f55c84e943 100644
--- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
+++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
@@ -47,8 +47,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();
- PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
- super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
+ PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
+ super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
}
public Pending[] getPending() {
@@ -88,10 +88,14 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
- pending.add(new Pending(t.runnable, t.priority(), t.insertionOrder, executing));
+ pending.add(new Pending(unwrap(t.runnable), t.priority(), t.insertionOrder, executing));
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
- pending.add(new Pending(t.task, t.priority, t.insertionOrder, executing));
+ Object task = t.task;
+ if (t.task instanceof Runnable) {
+ task = unwrap((Runnable) t.task);
+ }
+ pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
}
}
}
@@ -107,12 +111,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
- if (command instanceof PrioritizedRunnable) {
- command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
- } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
- command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
- }
- super.execute(command);
+ command = wrapRunnable(command);
+ doExecute(command);
if (timeout.nanos() >= 0) {
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
@@ -125,21 +125,31 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
@Override
- public void execute(Runnable command) {
+ protected Runnable wrapRunnable(Runnable command) {
if (command instanceof PrioritizedRunnable) {
- command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
- } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
- command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
+ if ((command instanceof TieBreakingPrioritizedRunnable)) {
+ return command;
+ }
+ Priority priority = ((PrioritizedRunnable) command).priority();
+ return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
+ } else if (command instanceof PrioritizedFutureTask) {
+ return command;
+ } else { // it might be a callable wrapper...
+ if (command instanceof TieBreakingPrioritizedRunnable) {
+ return command;
+ }
+ return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
}
- super.execute(command);
}
+
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
- return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
+ Priority priority = ((PrioritizedRunnable) runnable).priority();
+ return new PrioritizedFutureTask<>(runnable, priority, value, insertionOrder.incrementAndGet());
}
@Override
@@ -147,7 +157,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
}
- return new PrioritizedFutureTask<>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet());
+ return new PrioritizedFutureTask<>((PrioritizedCallable)callable, insertionOrder.incrementAndGet());
}
public static class Pending {
@@ -173,10 +183,6 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private ScheduledFuture<?> timeoutFuture;
private boolean started = false;
- TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
- this(runnable, runnable.priority(), insertionOrder);
- }
-
TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long insertionOrder) {
super(priority);
this.runnable = runnable;
@@ -233,6 +239,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
runnable = null;
timeoutFuture = null;
}
+
}
}
@@ -242,10 +249,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
final Priority priority;
final long insertionOrder;
- public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long insertionOrder) {
+ public PrioritizedFutureTask(Runnable runnable, Priority priority, T value, long insertionOrder) {
super(runnable, value);
this.task = runnable;
- this.priority = runnable.priority();
+ this.priority = priority;
this.insertionOrder = insertionOrder;
}
@@ -265,4 +272,5 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
return insertionOrder < pft.insertionOrder ? -1 : 1;
}
}
+
}