summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java12
1 files changed, 10 insertions, 2 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 0014404057..3c3ed714b5 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@@ -302,7 +303,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final TransportChannel channel;
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
- private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
+ private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
this.request = request;
@@ -313,9 +314,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
+ final ThreadContext threadContext = threadPool.getThreadContext();
+ final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
+ context.close();
// Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
@@ -411,7 +415,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
ReroutePhase(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
- this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
+ this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
}
@Override
@@ -515,9 +519,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
finishAsFailed(failure);
return;
}
+ final ThreadContext threadContext = threadPool.getThreadContext();
+ final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
+ context.close();
run();
}
@@ -528,6 +535,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
public void onTimeout(TimeValue timeout) {
+ context.close();
// Try one more time...
run();
}