summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java48
1 files changed, 33 insertions, 15 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index b297220180..c40d3fb579 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@@ -60,6 +61,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
@@ -133,8 +135,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
@Override
- protected void doExecute(Request request, ActionListener<Response> listener) {
- new ReroutePhase(request, listener).run();
+ protected final void doExecute(Request request, ActionListener<Response> listener) {
+ throw new UnsupportedOperationException("the task parameter is required for this operation");
+ }
+
+ @Override
+ protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
+ new ReroutePhase(task, request, listener).run();
}
protected abstract Response newResponseInstance();
@@ -243,8 +250,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
class OperationTransportHandler implements TransportRequestHandler<Request> {
@Override
- public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
- execute(request, new ActionListener<Response>() {
+ public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
+ execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
@@ -264,6 +271,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
});
}
+
+ @Override
+ public void messageReceived(Request request, TransportChannel channel) throws Exception {
+ throw new UnsupportedOperationException("the task parameter is required for this operation");
+ }
}
class PrimaryOperationTransportHandler implements TransportRequestHandler<Request> {
@@ -297,7 +309,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final TransportChannel channel;
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
- private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
+ private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
this.request = request;
@@ -308,9 +320,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
+ final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
+ context.close();
// Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
@@ -339,7 +353,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
}
private void failReplicaIfNeeded(Throwable t) {
- String index = request.shardId().getIndex();
+ String index = request.shardId().getIndex().getName();
int shardId = request.shardId().id();
logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
if (ignoreReplicaException(t) == false) {
@@ -403,10 +417,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final ClusterStateObserver observer;
private final AtomicBoolean finished = new AtomicBoolean();
- ReroutePhase(Request request, ActionListener<Response> listener) {
+ ReroutePhase(Task task, Request request, ActionListener<Response> listener) {
this.request = request;
+ if (task != null) {
+ this.request.setParentTask(clusterService.localNode().getId(), task.getId());
+ }
this.listener = listener;
- this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
+ this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
}
@Override
@@ -432,7 +449,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
resolveRequest(state.metaData(), concreteIndex, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
- IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId().getIndex(), request.shardId().id());
+ IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
final ShardRouting primary = indexShard.primaryShard();
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
@@ -510,9 +527,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
finishAsFailed(failure);
return;
}
+ final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
+ context.close();
run();
}
@@ -523,6 +542,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
public void onTimeout(TimeValue timeout) {
+ context.close();
// Try one more time...
run();
}
@@ -637,7 +657,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
final int sizeActive;
final int requiredNumber;
- IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndex());
+ IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
if (shardRoutingTable != null) {
@@ -702,7 +722,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
protected Releasable getIndexShardOperationsCounter(ShardId shardId) {
- IndexService indexService = indicesService.indexServiceSafe(shardId.index().getName());
+ IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReference(indexShard);
}
@@ -876,7 +896,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
onReplicaFailure(nodeId, exp);
} else {
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
- logger.warn("{} {}", exp, shardId, message);
+ logger.warn("[{}] {}", exp, shardId, message);
shardStateAction.shardFailed(
shard,
indexUUID,
@@ -941,9 +961,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) {
RestStatus restStatus = ExceptionsHelper.status(entry.getValue());
- failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure(
- shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false
- );
+ failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure(shardId, entry.getKey(), entry.getValue(), restStatus, false);
}
} else {
failuresArray = ReplicationResponse.EMPTY;