summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java23
1 files changed, 16 insertions, 7 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java
index ab88d73d3b..fd649f046e 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java
@@ -40,6 +40,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -67,8 +68,14 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
this.clusterService = clusterService;
}
+
+ @Override
+ protected final void doExecute(final Request request, final ActionListener<Response> listener) {
+ throw new UnsupportedOperationException("the task parameter is required for this operation");
+ }
+
@Override
- protected void doExecute(final Request request, final ActionListener<Response> listener) {
+ protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState clusterState = clusterService.state();
List<ShardId> shards = shards(request, clusterState);
final CopyOnWriteArrayList<ShardResponse> shardsResponses = new CopyOnWriteArrayList();
@@ -90,13 +97,13 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
@Override
public void onFailure(Throwable e) {
logger.trace("{}: got failure from {}", actionName, shardId);
- int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1;
+ int totalNumCopies = clusterState.getMetaData().index(shardId.getIndexName()).getNumberOfReplicas() + 1;
ShardResponse shardResponse = newShardResponse();
ReplicationResponse.ShardInfo.Failure[] failures;
if (TransportActions.isShardNotAvailableException(e)) {
failures = new ReplicationResponse.ShardInfo.Failure[0];
} else {
- ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);
+ ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId, null, e, ExceptionsHelper.status(e), true);
failures = new ReplicationResponse.ShardInfo.Failure[totalNumCopies];
Arrays.fill(failures, failure);
}
@@ -107,12 +114,14 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
}
}
};
- shardExecute(request, shardId, shardActionListener);
+ shardExecute(task, request, shardId, shardActionListener);
}
}
- protected void shardExecute(Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
- replicatedBroadcastShardAction.execute(newShardRequest(request, shardId), shardActionListener);
+ protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
+ ShardRequest shardRequest = newShardRequest(request, shardId);
+ shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
+ replicatedBroadcastShardAction.execute(shardRequest, shardActionListener);
}
/**
@@ -154,7 +163,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
shardFailures = new ArrayList<>();
}
for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
- shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause())));
+ shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(failure.fullShardId(), failure.getCause())));
}
}
}