From cac2eec7d298f5e3fcafde73bb975028bfd36741 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Jun 2017 15:16:07 -0500 Subject: Add NioTransport threads to thread name checks (#25477) We have various assertions that check we never block on transport threads. This commit adds the thread names for the NioTransport to these assertions. With this change I had to fix two places where we were calling blocking methods from the transport threads. --- .../org/elasticsearch/transport/nio/NioTransport.java | 15 +++++++++++---- .../transport/nio/channel/AbstractNioChannel.java | 5 ----- .../transport/nio/channel/ConnectFuture.java | 4 +++- 3 files changed, 14 insertions(+), 10 deletions(-) (limited to 'test/framework/src/main') diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 05c818476a..8b0d435a08 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; @@ -57,9 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class NioTransport extends TcpTransport { - // TODO: Need to add to places where we check if transport thread - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", @@ -108,7 +108,14 @@ public class NioTransport extends TcpTransport { for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) { try { - channel.closeAsync().awaitClose(); + // If we are currently on the selector thread that handles this channel, we should prefer + // the closeFromSelector method. This method always closes the channel immediately. + ESSelector selector = channel.getSelector(); + if (selector != null && selector.isOnCurrentThread()) { + channel.closeFromSelector(); + } else { + channel.closeAsync().awaitClose(); + } } catch (Exception e) { if (closingExceptions == null) { closingExceptions = new IOException("failed to close channels"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index be8dbe3f46..9792f9e64c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -102,11 +102,6 @@ public abstract class AbstractNioChannel { if (isDone()) { try { // Get should always return without blocking as we already checked 'isDone' - return super.get(); + return super.get(0, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { return null; + } catch (TimeoutException e) { + throw new AssertionError("This should never happen as we only call get() after isDone() is true."); } } else { return null; -- cgit v1.2.3