diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index ab88d73d3b..fd649f046e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -67,8 +68,14 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc this.clusterService = clusterService; } + + @Override + protected final void doExecute(final Request request, final ActionListener<Response> listener) { + throw new UnsupportedOperationException("the task parameter is required for this operation"); + } + @Override - protected void doExecute(final Request request, final ActionListener<Response> listener) { + protected void doExecute(Task task, Request request, ActionListener<Response> listener) { final ClusterState clusterState = clusterService.state(); List<ShardId> shards = shards(request, clusterState); final CopyOnWriteArrayList<ShardResponse> shardsResponses = new CopyOnWriteArrayList(); @@ -90,13 +97,13 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc @Override public void onFailure(Throwable e) { logger.trace("{}: got failure from {}", actionName, shardId); - int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1; + int totalNumCopies = clusterState.getMetaData().index(shardId.getIndexName()).getNumberOfReplicas() + 1; ShardResponse shardResponse = newShardResponse(); ReplicationResponse.ShardInfo.Failure[] failures; if (TransportActions.isShardNotAvailableException(e)) { failures = new ReplicationResponse.ShardInfo.Failure[0]; } else { - ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true); + ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId, null, e, ExceptionsHelper.status(e), true); failures = new ReplicationResponse.ShardInfo.Failure[totalNumCopies]; Arrays.fill(failures, failure); } @@ -107,12 +114,14 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc } } }; - shardExecute(request, shardId, shardActionListener); + shardExecute(task, request, shardId, shardActionListener); } } - protected void shardExecute(Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) { - replicatedBroadcastShardAction.execute(newShardRequest(request, shardId), shardActionListener); + protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) { + ShardRequest shardRequest = newShardRequest(request, shardId); + shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); + replicatedBroadcastShardAction.execute(shardRequest, shardActionListener); } /** @@ -154,7 +163,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc shardFailures = new ArrayList<>(); } for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) { - shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause()))); + shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(failure.fullShardId(), failure.getCause()))); } } } |