diff options
author | Tim Brooks <tim@uncontended.net> | 2017-06-29 15:16:07 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-29 15:16:07 -0500 |
commit | cac2eec7d298f5e3fcafde73bb975028bfd36741 (patch) | |
tree | 30cfa43f620c2b3227575246dc0e39d082b2db67 /test/framework/src/main | |
parent | c32c21e875f00a175863c8c0033c2599dcf9bf78 (diff) |
Add NioTransport threads to thread name checks (#25477)
We have various assertions that check we never block on transport
threads. This commit adds the thread names for the NioTransport to
these assertions.
With this change I had to fix two places where we were calling blocking
methods from the transport threads.
Diffstat (limited to 'test/framework/src/main')
3 files changed, 14 insertions, 10 deletions
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 05c818476a..8b0d435a08 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; @@ -57,9 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class NioTransport extends TcpTransport<NioChannel> { - // TODO: Need to add to places where we check if transport thread - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting<Integer> NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", @@ -108,7 +108,14 @@ public class NioTransport extends TcpTransport<NioChannel> { for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) { try { - channel.closeAsync().awaitClose(); + // If we are currently on the selector thread that handles this channel, we should prefer + // the closeFromSelector method. This method always closes the channel immediately. + ESSelector selector = channel.getSelector(); + if (selector != null && selector.isOnCurrentThread()) { + channel.closeFromSelector(); + } else { + channel.closeAsync().awaitClose(); + } } catch (Exception e) { if (closingExceptions == null) { closingExceptions = new IOException("failed to close channels"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index be8dbe3f46..9792f9e64c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -102,11 +102,6 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh */ @Override public CloseFuture closeAsync() { - if (selector != null && selector.isOnCurrentThread()) { - closeFromSelector(); - return closeFuture; - } - for (; ; ) { int state = this.state.get(); if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java index 4bc1ca6043..1675c7326e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java @@ -80,12 +80,14 @@ public class ConnectFuture extends BaseFuture<NioSocketChannel> { if (isDone()) { try { // Get should always return without blocking as we already checked 'isDone' - return super.get(); + return super.get(0, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { return null; + } catch (TimeoutException e) { + throw new AssertionError("This should never happen as we only call get() after isDone() is true."); } } else { return null; |