summaryrefslogtreecommitdiff
path: root/test/framework/src/main
diff options
context:
space:
mode:
authorTim Brooks <tim@uncontended.net>2017-06-29 15:16:07 -0500
committerGitHub <noreply@github.com>2017-06-29 15:16:07 -0500
commitcac2eec7d298f5e3fcafde73bb975028bfd36741 (patch)
tree30cfa43f620c2b3227575246dc0e39d082b2db67 /test/framework/src/main
parentc32c21e875f00a175863c8c0033c2599dcf9bf78 (diff)
Add NioTransport threads to thread name checks (#25477)
We have various assertions that check we never block on transport threads. This commit adds the thread names for the NioTransport to these assertions. With this change I had to fix two places where we were calling blocking methods from the transport threads.
Diffstat (limited to 'test/framework/src/main')
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java15
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java5
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java4
3 files changed, 14 insertions, 10 deletions
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java
index 05c818476a..8b0d435a08 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java
@@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
+import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
@@ -57,9 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class NioTransport extends TcpTransport<NioChannel> {
- // TODO: Need to add to places where we check if transport thread
- public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
- public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor";
+ public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
+ public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX;
public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
@@ -108,7 +108,14 @@ public class NioTransport extends TcpTransport<NioChannel> {
for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) {
try {
- channel.closeAsync().awaitClose();
+ // If we are currently on the selector thread that handles this channel, we should prefer
+ // the closeFromSelector method. This method always closes the channel immediately.
+ ESSelector selector = channel.getSelector();
+ if (selector != null && selector.isOnCurrentThread()) {
+ channel.closeFromSelector();
+ } else {
+ channel.closeAsync().awaitClose();
+ }
} catch (Exception e) {
if (closingExceptions == null) {
closingExceptions = new IOException("failed to close channels");
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java
index be8dbe3f46..9792f9e64c 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java
@@ -102,11 +102,6 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
*/
@Override
public CloseFuture closeAsync() {
- if (selector != null && selector.isOnCurrentThread()) {
- closeFromSelector();
- return closeFuture;
- }
-
for (; ; ) {
int state = this.state.get();
if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) {
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java
index 4bc1ca6043..1675c7326e 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java
@@ -80,12 +80,14 @@ public class ConnectFuture extends BaseFuture<NioSocketChannel> {
if (isDone()) {
try {
// Get should always return without blocking as we already checked 'isDone'
- return super.get();
+ return super.get(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
return null;
+ } catch (TimeoutException e) {
+ throw new AssertionError("This should never happen as we only call get() after isDone() is true.");
}
} else {
return null;