summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java')
-rw-r--r--core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java31
1 files changed, 15 insertions, 16 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
index ba067fdabd..d3957245bc 100644
--- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
+++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
@@ -36,6 +36,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.ConnectTransportException;
@@ -72,7 +73,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport {
public static final String LOCAL_TRANSPORT_THREAD_NAME_PREFIX = "local_transport";
- private final ThreadPool threadPool;
+ final ThreadPool threadPool;
private final ThreadPoolExecutor workers;
private final Version version;
private volatile TransportServiceAdapter transportServiceAdapter;
@@ -96,7 +97,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
- this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory);
+ this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, threadPool.getThreadContext());
this.namedWriteableRegistry = namedWriteableRegistry;
}
@@ -209,6 +210,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
status = TransportStatus.setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
+ threadPool.getThreadContext().writeTo(stream);
stream.writeString(action);
request.writeTo(stream);
@@ -220,12 +222,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
final byte[] data = stream.bytes().toBytes();
-
transportServiceAdapter.sent(data.length);
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
- targetTransport.workers().execute(new Runnable() {
- @Override
- public void run() {
+ targetTransport.workers().execute(() -> {
+ ThreadContext threadContext = threadPool.getThreadContext();
+ try (ThreadContext.StoredContext context = threadContext.stashContext()) {
targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
}
});
@@ -246,8 +247,9 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
long requestId = stream.readLong();
byte status = stream.readByte();
boolean isRequest = TransportStatus.isRequest(status);
-
if (isRequest) {
+ ThreadContext threadContext = threadPool.getThreadContext();
+ threadContext.readHeaders(stream);
handleRequest(stream, requestId, sourceTransport, version);
} else {
final TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
@@ -322,6 +324,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
+
}
}
@@ -338,15 +341,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
protected void handleParsedResponse(final TransportResponse response, final TransportResponseHandler handler) {
- threadPool.executor(handler.executor()).execute(new Runnable() {
- @SuppressWarnings({"unchecked"})
- @Override
- public void run() {
- try {
- handler.handleResponse(response);
- } catch (Throwable e) {
- handleException(handler, new ResponseHandlerFailureTransportException(e));
- }
+ threadPool.executor(handler.executor()).execute(() -> {
+ try {
+ handler.handleResponse(response);
+ } catch (Throwable e) {
+ handleException(handler, new ResponseHandlerFailureTransportException(e));
}
});
}