summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/transport/TransportService.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/transport/TransportService.java')
-rw-r--r--core/src/main/java/org/elasticsearch/transport/TransportService.java39
1 files changed, 38 insertions, 1 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java
index 5d74c4a408..8cff05a4d6 100644
--- a/core/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -40,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@@ -288,7 +289,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} else {
timeoutHandler = new TimeoutHandler(requestId);
}
- clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler));
+ clientHandlers.put(requestId, new RequestHolder<>(new ContextRestoreResponseHandler<T>(threadPool.getThreadContext().newStoredContext(), handler), node, action, timeoutHandler));
if (started.get() == false) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
// it will only notify if the toStop code hasn't done the work yet.
@@ -494,6 +495,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override
public TransportResponseHandler onResponseReceived(final long requestId) {
RequestHolder holder = clientHandlers.remove(requestId);
+
if (holder == null) {
checkForTimeout(requestId);
return null;
@@ -708,6 +710,41 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
+ /**
+ * This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the4 handle methods
+ * are invoked we restore the context.
+ */
+ private final static class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
+ private final TransportResponseHandler<T> delegate;
+ private final ThreadContext.StoredContext threadContext;
+ private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler<T> delegate) {
+ this.delegate = delegate;
+ this.threadContext = threadContext;
+ }
+
+ @Override
+ public T newInstance() {
+ return delegate.newInstance();
+ }
+
+ @Override
+ public void handleResponse(T response) {
+ threadContext.restore();
+ delegate.handleResponse(response);
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ threadContext.restore();
+ delegate.handleException(exp);
+ }
+
+ @Override
+ public String executor() {
+ return delegate.executor();
+ }
+ }
+
static class DirectResponseChannel implements TransportChannel {
final ESLogger logger;
final DiscoveryNode localNode;