diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java | 42 |
1 files changed, 30 insertions, 12 deletions
diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 0e6204ddd1..378a849115 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -20,6 +20,7 @@ package org.elasticsearch.threadpool; import org.apache.lucene.util.Counter; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -34,11 +35,13 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsAbortPolicy; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -67,7 +70,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; /** * */ -public class ThreadPool extends AbstractComponent { +public class ThreadPool extends AbstractComponent implements Closeable { public static class Names { public static final String SAME = "same"; @@ -200,6 +203,8 @@ public class ThreadPool extends AbstractComponent { static final Executor DIRECT_EXECUTOR = command -> command.run(); + private final ThreadContext threadContext; + public ThreadPool(String name) { this(Settings.builder().put("name", name).build()); } @@ -208,7 +213,7 @@ public class ThreadPool extends AbstractComponent { super(settings); assert settings.get("name") != null : "ThreadPool's settings should contain a name"; - + threadContext = new ThreadContext(settings); Map<String, Settings> groupSettings = THREADPOOL_GROUP_SETTING.get(settings).getAsGroups(); validate(groupSettings); @@ -448,7 +453,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); } - Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null)); } else if (ThreadPoolType.FIXED == threadPoolType) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); @@ -483,7 +488,7 @@ public class ThreadPool extends AbstractComponent { int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize)); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); - Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); + Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize)); } else if (ThreadPoolType.SCALING == threadPoolType) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); @@ -527,7 +532,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } - Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null)); } throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); @@ -914,17 +919,30 @@ public class ThreadPool extends AbstractComponent { */ public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) { if (pool != null) { - pool.shutdown(); try { - if (pool.awaitTermination(timeout, timeUnit)) { - return true; + pool.shutdown(); + try { + if (pool.awaitTermination(timeout, timeUnit)) { + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + // last resort + pool.shutdownNow(); + } finally { + IOUtils.closeWhileHandlingException(pool); } - // last resort - pool.shutdownNow(); } return false; } + + @Override + public void close() throws IOException { + threadContext.close(); + } + + public ThreadContext getThreadContext() { + return threadContext; + } } |