diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java | 31 |
1 files changed, 15 insertions, 16 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index ba067fdabd..d3957245bc 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.ConnectTransportException; @@ -72,7 +73,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport { public static final String LOCAL_TRANSPORT_THREAD_NAME_PREFIX = "local_transport"; - private final ThreadPool threadPool; + final ThreadPool threadPool; private final ThreadPoolExecutor workers; private final Version version; private volatile TransportServiceAdapter transportServiceAdapter; @@ -96,7 +97,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); - this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory); + this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, threadPool.getThreadContext()); this.namedWriteableRegistry = namedWriteableRegistry; } @@ -209,6 +210,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem status = TransportStatus.setRequest(status); stream.writeByte(status); // 0 for request, 1 for response. + threadPool.getThreadContext().writeTo(stream); stream.writeString(action); request.writeTo(stream); @@ -220,12 +222,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem } final byte[] data = stream.bytes().toBytes(); - transportServiceAdapter.sent(data.length); transportServiceAdapter.onRequestSent(node, requestId, action, request, options); - targetTransport.workers().execute(new Runnable() { - @Override - public void run() { + targetTransport.workers().execute(() -> { + ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext context = threadContext.stashContext()) { targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId); } }); @@ -246,8 +247,9 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem long requestId = stream.readLong(); byte status = stream.readByte(); boolean isRequest = TransportStatus.isRequest(status); - if (isRequest) { + ThreadContext threadContext = threadPool.getThreadContext(); + threadContext.readHeaders(stream); handleRequest(stream, requestId, sourceTransport, version); } else { final TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); @@ -322,6 +324,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem logger.warn("Failed to send error message back to client for action [" + action + "]", e); logger.warn("Actual Exception", e1); } + } } @@ -338,15 +341,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem } protected void handleParsedResponse(final TransportResponse response, final TransportResponseHandler handler) { - threadPool.executor(handler.executor()).execute(new Runnable() { - @SuppressWarnings({"unchecked"}) - @Override - public void run() { - try { - handler.handleResponse(response); - } catch (Throwable e) { - handleException(handler, new ResponseHandlerFailureTransportException(e)); - } + threadPool.executor(handler.executor()).execute(() -> { + try { + handler.handleResponse(response); + } catch (Throwable e) { + handleException(handler, new ResponseHandlerFailureTransportException(e)); } }); } |