summaryrefslogtreecommitdiff
path: root/test/framework/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'test/framework/src/main')
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java15
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java5
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java4
3 files changed, 14 insertions, 10 deletions
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java
index 05c818476a..8b0d435a08 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java
@@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
+import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
@@ -57,9 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class NioTransport extends TcpTransport<NioChannel> {
- // TODO: Need to add to places where we check if transport thread
- public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
- public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor";
+ public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
+ public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX;
public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
@@ -108,7 +108,14 @@ public class NioTransport extends TcpTransport<NioChannel> {
for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) {
try {
- channel.closeAsync().awaitClose();
+ // If we are currently on the selector thread that handles this channel, we should prefer
+ // the closeFromSelector method. This method always closes the channel immediately.
+ ESSelector selector = channel.getSelector();
+ if (selector != null && selector.isOnCurrentThread()) {
+ channel.closeFromSelector();
+ } else {
+ channel.closeAsync().awaitClose();
+ }
} catch (Exception e) {
if (closingExceptions == null) {
closingExceptions = new IOException("failed to close channels");
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java
index be8dbe3f46..9792f9e64c 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java
@@ -102,11 +102,6 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
*/
@Override
public CloseFuture closeAsync() {
- if (selector != null && selector.isOnCurrentThread()) {
- closeFromSelector();
- return closeFuture;
- }
-
for (; ; ) {
int state = this.state.get();
if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) {
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java
index 4bc1ca6043..1675c7326e 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java
@@ -80,12 +80,14 @@ public class ConnectFuture extends BaseFuture<NioSocketChannel> {
if (isDone()) {
try {
// Get should always return without blocking as we already checked 'isDone'
- return super.get();
+ return super.get(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
return null;
+ } catch (TimeoutException e) {
+ throw new AssertionError("This should never happen as we only call get() after isDone() is true.");
}
} else {
return null;