summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java')
-rw-r--r--core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java42
1 files changed, 30 insertions, 12 deletions
diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
index 0e6204ddd1..378a849115 100644
--- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
+++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
@@ -20,6 +20,7 @@
package org.elasticsearch.threadpool;
import org.apache.lucene.util.Counter;
+import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -34,11 +35,13 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -67,7 +70,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
/**
*
*/
-public class ThreadPool extends AbstractComponent {
+public class ThreadPool extends AbstractComponent implements Closeable {
public static class Names {
public static final String SAME = "same";
@@ -200,6 +203,8 @@ public class ThreadPool extends AbstractComponent {
static final Executor DIRECT_EXECUTOR = command -> command.run();
+ private final ThreadContext threadContext;
+
public ThreadPool(String name) {
this(Settings.builder().put("name", name).build());
}
@@ -208,7 +213,7 @@ public class ThreadPool extends AbstractComponent {
super(settings);
assert settings.get("name") != null : "ThreadPool's settings should contain a name";
-
+ threadContext = new ThreadContext(settings);
Map<String, Settings> groupSettings = THREADPOOL_GROUP_SETTING.get(settings).getAsGroups();
validate(groupSettings);
@@ -448,7 +453,7 @@ public class ThreadPool extends AbstractComponent {
} else {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
}
- Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
+ Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null));
} else if (ThreadPoolType.FIXED == threadPoolType) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
@@ -483,7 +488,7 @@ public class ThreadPool extends AbstractComponent {
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
- Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
+ Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext);
return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize));
} else if (ThreadPoolType.SCALING == threadPoolType) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
@@ -527,7 +532,7 @@ public class ThreadPool extends AbstractComponent {
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
}
- Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
+ Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null));
}
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
@@ -914,17 +919,30 @@ public class ThreadPool extends AbstractComponent {
*/
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
if (pool != null) {
- pool.shutdown();
try {
- if (pool.awaitTermination(timeout, timeUnit)) {
- return true;
+ pool.shutdown();
+ try {
+ if (pool.awaitTermination(timeout, timeUnit)) {
+ return true;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ // last resort
+ pool.shutdownNow();
+ } finally {
+ IOUtils.closeWhileHandlingException(pool);
}
- // last resort
- pool.shutdownNow();
}
return false;
}
+
+ @Override
+ public void close() throws IOException {
+ threadContext.close();
+ }
+
+ public ThreadContext getThreadContext() {
+ return threadContext;
+ }
}