summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java')
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index e7518bd594..7962f23caf 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -122,6 +122,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> {
+ private long clusterStateVersion;
private IndexShard primary;
private IndexMetaData indexMetaData;
private final List<IndexShard> replicas;
@@ -142,6 +143,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting));
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
+ clusterStateVersion = 1;
updateAllocationIDsOnPrimary();
for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) {
addReplica();
@@ -222,6 +224,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
for (final IndexShard replica : replicas) {
recoverReplica(replica);
@@ -241,6 +244,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
replicas.add(replica);
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
@@ -255,6 +259,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null,
getEngineFactory(shardRouting));
replicas.add(newReplica);
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
return newReplica;
}
@@ -274,6 +279,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
closeShards(primary);
primary = replica;
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
+
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
@@ -289,6 +295,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
fut.onFailure(e);
}
}));
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
return fut;
}
@@ -296,6 +303,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
synchronized boolean removeReplica(IndexShard replica) {
final boolean removed = replicas.remove(replica);
if (removed) {
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
return removed;
@@ -315,6 +323,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
@@ -402,7 +411,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
initializing.add(shard.allocationId().getId());
}
}
- primary.updateAllocationIdsFromMaster(active, initializing);
+ primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing);
}
}