summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java')
-rw-r--r--core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java7
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
index 8df17f7323..6732b26ddb 100644
--- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
+++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
@@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteTransportException;
@@ -64,9 +65,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
protected final TransportServiceAdapter transportServiceAdapter;
protected final NettyTransport transport;
protected final String profileName;
+ private final ThreadContext threadContext;
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
this.threadPool = transport.threadPool();
+ this.threadContext = threadPool.getThreadContext();
this.transportServiceAdapter = transport.transportServiceAdapter();
this.transport = transport;
this.logger = logger;
@@ -101,7 +104,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
boolean success = false;
- try {
+ try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) {
long requestId = streamIn.readLong();
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());
@@ -123,8 +126,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
streamIn = compressor.streamInput(streamIn);
}
streamIn.setVersion(version);
-
if (TransportStatus.isRequest(status)) {
+ threadContext.readHeaders(streamIn);
String action = handleRequest(ctx.getChannel(), streamIn, requestId, version);
// Chek the entire message has been read