diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0014404057..3c3ed714b5 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; @@ -302,7 +303,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ private final TransportChannel channel; // important: we pass null as a timeout as failing a replica is // something we want to avoid at all costs - private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); + private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) { this.request = request; @@ -313,9 +314,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ public void onFailure(Throwable t) { if (t instanceof RetryOnReplicaException) { logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request); + final ThreadContext threadContext = threadPool.getThreadContext(); + final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { + context.close(); // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; @@ -411,7 +415,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ ReroutePhase(Request request, ActionListener<Response> listener) { this.request = request; this.listener = listener; - this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger); + this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); } @Override @@ -515,9 +519,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ finishAsFailed(failure); return; } + final ThreadContext threadContext = threadPool.getThreadContext(); + final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { + context.close(); run(); } @@ -528,6 +535,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ @Override public void onTimeout(TimeValue timeout) { + context.close(); // Try one more time... run(); } |