summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java')
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java22
1 files changed, 17 insertions, 5 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 9d091429f2..385b342efb 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@@ -59,6 +61,8 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
@@ -83,6 +87,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -112,6 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
+ private final PrimaryReplicaSyncer primaryReplicaSyncer;
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
@@ -121,11 +127,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
- GlobalCheckpointSyncAction globalCheckpointSyncAction) {
+ GlobalCheckpointSyncAction globalCheckpointSyncAction,
+ PrimaryReplicaSyncer primaryReplicaSyncer) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
- snapshotShardsService, globalCheckpointSyncAction);
+ snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer);
}
// for tests
@@ -138,7 +145,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
- GlobalCheckpointSyncAction globalCheckpointSyncAction) {
+ GlobalCheckpointSyncAction globalCheckpointSyncAction,
+ PrimaryReplicaSyncer primaryReplicaSyncer) {
super(settings);
this.buildInIndexListener =
Arrays.asList(
@@ -155,6 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
this.shardStateAction = shardStateAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.repositoriesService = repositoriesService;
+ this.primaryReplicaSyncer = primaryReplicaSyncer;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@@ -560,7 +569,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
- shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
+ shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
+ primaryReplicaSyncer::resync);
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
@@ -741,8 +751,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
- void updatePrimaryTerm(long primaryTerm);
+ void updatePrimaryTerm(long primaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer);
/**
* Notifies the service of the current allocation ids in the cluster state.