diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9d091429f2..385b342efb 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -59,6 +61,8 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; @@ -83,6 +87,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -112,6 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final boolean sendRefreshMapping; private final List<IndexEventListener> buildInIndexListener; + private final PrimaryReplicaSyncer primaryReplicaSyncer; @Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, @@ -121,11 +127,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple RepositoriesService repositoriesService, SearchService searchService, SyncedFlushService syncedFlushService, PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction) { + GlobalCheckpointSyncAction globalCheckpointSyncAction, + PrimaryReplicaSyncer primaryReplicaSyncer) { this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService, clusterService, threadPool, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService, - snapshotShardsService, globalCheckpointSyncAction); + snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer); } // for tests @@ -138,7 +145,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple RepositoriesService repositoriesService, SearchService searchService, SyncedFlushService syncedFlushService, PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction) { + GlobalCheckpointSyncAction globalCheckpointSyncAction, + PrimaryReplicaSyncer primaryReplicaSyncer) { super(settings); this.buildInIndexListener = Arrays.asList( @@ -155,6 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple this.shardStateAction = shardStateAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; + this.primaryReplicaSyncer = primaryReplicaSyncer; this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); } @@ -560,7 +569,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); final Set<String> initializingIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); - shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id())); + shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()), + primaryReplicaSyncer::resync); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } } catch (Exception e) { @@ -741,8 +751,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple * Update the primary term. This method should only be invoked on primary shards. * * @param primaryTerm the new primary term + * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary */ - void updatePrimaryTerm(long primaryTerm); + void updatePrimaryTerm(long primaryTerm, + CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer); /** * Notifies the service of the current allocation ids in the cluster state. |