diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/transport/TransportService.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/transport/TransportService.java | 39 |
1 files changed, 38 insertions, 1 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 5d74c4a408..8cff05a4d6 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -288,7 +289,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic } else { timeoutHandler = new TimeoutHandler(requestId); } - clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler)); + clientHandlers.put(requestId, new RequestHolder<>(new ContextRestoreResponseHandler<T>(threadPool.getThreadContext().newStoredContext(), handler), node, action, timeoutHandler)); if (started.get() == false) { // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller. // it will only notify if the toStop code hasn't done the work yet. @@ -494,6 +495,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic @Override public TransportResponseHandler onResponseReceived(final long requestId) { RequestHolder holder = clientHandlers.remove(requestId); + if (holder == null) { checkForTimeout(requestId); return null; @@ -708,6 +710,41 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic } } + /** + * This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the4 handle methods + * are invoked we restore the context. + */ + private final static class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> { + private final TransportResponseHandler<T> delegate; + private final ThreadContext.StoredContext threadContext; + private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler<T> delegate) { + this.delegate = delegate; + this.threadContext = threadContext; + } + + @Override + public T newInstance() { + return delegate.newInstance(); + } + + @Override + public void handleResponse(T response) { + threadContext.restore(); + delegate.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + threadContext.restore(); + delegate.handleException(exp); + } + + @Override + public String executor() { + return delegate.executor(); + } + } + static class DirectResponseChannel implements TransportChannel { final ESLogger logger; final DiscoveryNode localNode; |