summaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-06-26 14:09:15 -0400
committerGitHub <noreply@github.com>2017-06-26 14:09:15 -0400
commitc6a03bc5497dda8aeffe36e56e8ce45c4ad09f73 (patch)
tree7e57b91fc985eece7ae94fa520fc072b1421cd08 /core/src/test
parent4306315ff6d1bccea59c18e370d56199458df586 (diff)
Introduce primary context (#25122)
* Introduce primary context The target of a primary relocation is not aware of the state of the replication group. In particular, it is not tracking in-sync and initializing shards and their checkpoints. This means that after the target shard is started, its knowledge of the replication group could differ from that of the relocation source. In particular, this differing view can lead to it computing a global checkpoint that moves backwards after it becomes aware of the state of the entire replication group. This commit addresses this issue by transferring a primary context during relocation handoff. * Fix test * Add assertion messages * Javadocs * Barrier between marking a shard in sync and relocating * Fix misplaced call * Paranoia * Better latch countdown * Catch any exception * Fix comment * Fix wait for cluster state relocation test * Update knowledge via upate local checkpoint API * toString * Visibility * Refactor permit * Push down * Imports * Docs * Fix compilation * Remove assertion * Fix compilation * Remove context wrapper * Move PrimaryContext to new package * Piping for cluster state version This commit adds piping for the cluster state version to the global checkpoint tracker. We do not use it yet. * Remove unused import * Implement versioning in tracker * Fix test * Unneeded public * Imports * Promote on our own * Add tests * Import * Newline * Update comment * Serialization * Assertion message * Update stale comment * Remove newline * Less verbose * Remove redundant assertion * Tracking -> in-sync * Assertions * Just say no Friends do not let friends block the cluster state update thread on network operations. * Extra newline * Add allocation ID to assertion * Rename method * Another rename * Introduce sealing * Sealing tests * One more assertion * Fix imports * Safer sealing * Remove check * Remove another sealed check
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java11
-rw-r--r--core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java270
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java18
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java5
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java10
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/RelocationIT.java8
9 files changed, 280 insertions, 51 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 8811083baa..e9c8916634 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -2022,7 +2022,10 @@ public class InternalEngineTests extends ESTestCase {
initialEngine = engine;
initialEngine
.seqNoService()
- .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet());
+ .updateAllocationIdsFromMaster(
+ randomNonNegativeLong(),
+ new HashSet<>(Arrays.asList("primary", "replica")),
+ Collections.emptySet());
for (int op = 0; op < opCount; op++) {
final String id;
// mostly index, sometimes delete
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index e7518bd594..7962f23caf 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -122,6 +122,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> {
+ private long clusterStateVersion;
private IndexShard primary;
private IndexMetaData indexMetaData;
private final List<IndexShard> replicas;
@@ -142,6 +143,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting));
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
+ clusterStateVersion = 1;
updateAllocationIDsOnPrimary();
for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) {
addReplica();
@@ -222,6 +224,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
for (final IndexShard replica : replicas) {
recoverReplica(replica);
@@ -241,6 +244,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
replicas.add(replica);
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
@@ -255,6 +259,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null,
getEngineFactory(shardRouting));
replicas.add(newReplica);
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
return newReplica;
}
@@ -274,6 +279,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
closeShards(primary);
primary = replica;
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
+
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
@@ -289,6 +295,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
fut.onFailure(e);
}
}));
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
return fut;
}
@@ -296,6 +303,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
synchronized boolean removeReplica(IndexShard replica) {
final boolean removed = replicas.remove(replica);
if (removed) {
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
return removed;
@@ -315,6 +323,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
+ clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
@@ -402,7 +411,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
initializing.add(shard.allocationId().getId());
}
}
- primary.updateAllocationIdsFromMaster(active, initializing);
+ primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing);
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java
index 61eb458132..ae4aab107f 100644
--- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java
+++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java
@@ -19,9 +19,13 @@
package org.elasticsearch.index.seqno;
+import com.carrotsearch.hppc.ObjectLongHashMap;
+import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.common.Randomness;
+import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@@ -29,7 +33,6 @@ import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,11 +46,15 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.not;
+import static org.mockito.Mockito.mock;
public class GlobalCheckpointTrackerTests extends ESTestCase {
@@ -79,6 +86,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testGlobalCheckpointUpdate() {
+ final long initialClusterStateVersion = randomNonNegativeLong();
Map<String, Long> allocations = new HashMap<>();
Map<String, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> active = new HashSet<>(activeWithCheckpoints.keySet());
@@ -107,7 +115,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
- tracker.updateAllocationIdsFromMaster(active, initializing);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing);
initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getGlobalCheckpoint()));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
@@ -130,7 +138,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
- tracker.updateAllocationIdsFromMaster(newActive, initializing);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActive, initializing);
// now notify for the new id
tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4));
@@ -146,6 +154,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
assigned.putAll(active);
assigned.putAll(initializing);
tracker.updateAllocationIdsFromMaster(
+ randomNonNegativeLong(),
active.keySet(),
initializing.keySet());
randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
@@ -166,7 +175,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
public void testMissingInSyncIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
- tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
+ tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet());
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
randomSubsetOf(randomInt(initializing.size() - 1),
initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId)));
@@ -184,7 +193,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
- tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
+ tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet());
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
@@ -196,6 +205,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
+ final long initialClusterStateVersion = randomNonNegativeLong();
final Map<String, Long> activeToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
@@ -211,7 +221,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
- tracker.updateAllocationIdsFromMaster(active, initializing);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing);
if (randomBoolean()) {
initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
} else {
@@ -223,11 +233,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
// now remove shards
if (randomBoolean()) {
- tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
- tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, activeToStay.keySet(), initializingToStay.keySet());
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
@@ -243,7 +253,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final AtomicBoolean complete = new AtomicBoolean();
final String inSyncAllocationId =randomAlphaOfLength(16);
final String trackingAllocationId = randomAlphaOfLength(16);
- tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
+ tracker.updateAllocationIdsFromMaster(
+ randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
@@ -291,7 +302,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final AtomicBoolean interrupted = new AtomicBoolean();
final String inSyncAllocationId = randomAlphaOfLength(16);
final String trackingAllocationId = randomAlphaOfLength(32);
- tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
+ tracker.updateAllocationIdsFromMaster(
+ randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
@@ -329,21 +341,14 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testUpdateAllocationIdsFromMaster() throws Exception {
+ final long initialClusterStateVersion = randomNonNegativeLong();
final int numberOfActiveAllocationsIds = randomIntBetween(2, 16);
- final Set<String> activeAllocationIds =
- IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16)).collect(Collectors.toSet());
final int numberOfInitializingIds = randomIntBetween(2, 16);
- final Set<String> initializingIds =
- IntStream.range(0, numberOfInitializingIds).mapToObj(i -> {
- do {
- final String initializingId = randomAlphaOfLength(16);
- // ensure we do not duplicate an allocation ID in active and initializing sets
- if (!activeAllocationIds.contains(initializingId)) {
- return initializingId;
- }
- } while (true);
- }).collect(Collectors.toSet());
- tracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingIds);
+ final Tuple<Set<String>, Set<String>> activeAndInitializingAllocationIds =
+ randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
+ final Set<String> activeAllocationIds = activeAndInitializingAllocationIds.v1();
+ final Set<String> initializingIds = activeAndInitializingAllocationIds.v2();
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingIds);
// first we assert that the in-sync and tracking sets are set up correctly
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
@@ -364,7 +369,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final List<String> removingInitializingAllocationIds = randomSubsetOf(initializingIds);
final Set<String> newInitializingAllocationIds =
initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet());
- tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActiveAllocationIds, newInitializingAllocationIds);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a)));
@@ -376,7 +381,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
*/
newActiveAllocationIds.add(randomAlphaOfLength(32));
newInitializingAllocationIds.add(randomAlphaOfLength(64));
- tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, newActiveAllocationIds, newInitializingAllocationIds);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(
newActiveAllocationIds
@@ -416,7 +421,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
// using a different length than we have been using above ensures that we can not collide with a previous allocation ID
final String newSyncingAllocationId = randomAlphaOfLength(128);
newInitializingAllocationIds.add(newSyncingAllocationId);
- tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 3, newActiveAllocationIds, newInitializingAllocationIds);
final CyclicBarrier barrier = new CyclicBarrier(2);
final Thread thread = new Thread(() -> {
try {
@@ -450,7 +455,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
* the in-sync set even if we receive a cluster state update that does not reflect this.
*
*/
- tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 4, newActiveAllocationIds, newInitializingAllocationIds);
assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId));
assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId));
}
@@ -471,7 +476,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final String active = randomAlphaOfLength(16);
final String initializing = randomAlphaOfLength(32);
- tracker.updateAllocationIdsFromMaster(Collections.singleton(active), Collections.singleton(initializing));
+ tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing));
final CyclicBarrier barrier = new CyclicBarrier(4);
@@ -516,7 +521,216 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
markingThread.join();
assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint));
+ }
+
+ public void testPrimaryContextOlderThanAppliedClusterState() {
+ final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE - 1) + 1;
+ final int numberOfActiveAllocationsIds = randomIntBetween(0, 8);
+ final int numberOfInitializingIds = randomIntBetween(0, 8);
+ final Tuple<Set<String>, Set<String>> activeAndInitializingAllocationIds =
+ randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
+ final Set<String> activeAllocationIds = activeAndInitializingAllocationIds.v1();
+ final Set<String> initializingAllocationIds = activeAndInitializingAllocationIds.v2();
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds);
+
+ /*
+ * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the
+ * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the
+ * newer cluster state is a superset of the allocation IDs in the applied cluster state with the caveat that an existing
+ * initializing allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the
+ * set of initializing allocation IDs is otherwise arbitrary.
+ */
+ final int numberOfAdditionalInitializingAllocationIds = randomIntBetween(0, 8);
+ final Set<String> initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds));
+ final Set<String> newInitializingAllocationIds =
+ randomAllocationIdsExcludingExistingIds(
+ Sets.union(activeAllocationIds, initializingAllocationIds), numberOfAdditionalInitializingAllocationIds);
+ final Set<String> contextInitializingIds = Sets.union(
+ new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))),
+ newInitializingAllocationIds);
+
+ final int numberOfAdditionalActiveAllocationIds = randomIntBetween(0, 8);
+ final Set<String> contextActiveAllocationIds = Sets.union(
+ Sets.union(
+ activeAllocationIds,
+ randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfAdditionalActiveAllocationIds)),
+ initializedAllocationIds);
+
+ final ObjectLongMap<String> activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>();
+ for (final String allocationId : contextActiveAllocationIds) {
+ activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
+ }
+ final ObjectLongMap<String> initializingAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>();
+ for (final String allocationId : contextInitializingIds) {
+ initializingAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
+ }
+
+ final PrimaryContext primaryContext = new PrimaryContext(
+ initialClusterStateVersion - randomIntBetween(0, Math.toIntExact(initialClusterStateVersion) - 1),
+ activeAllocationIdsLocalCheckpoints,
+ initializingAllocationIdsLocalCheckpoints);
+
+ tracker.updateAllocationIdsFromPrimaryContext(primaryContext);
+
+ // the primary context carries an older cluster state version
+ assertThat(tracker.appliedClusterStateVersion, equalTo(initialClusterStateVersion));
+
+ // only existing active allocation IDs and initializing allocation IDs that moved to initialized should be in-sync
+ assertThat(
+ Sets.union(activeAllocationIds, initializedAllocationIds),
+ equalTo(
+ StreamSupport
+ .stream(tracker.inSyncLocalCheckpoints.keys().spliterator(), false)
+ .map(e -> e.value)
+ .collect(Collectors.toSet())));
+
+ // the local checkpoints known to the tracker for in-sync shards should match what is known in the primary context
+ for (final String allocationId : Sets.union(activeAllocationIds, initializedAllocationIds)) {
+ assertThat(
+ tracker.inSyncLocalCheckpoints.get(allocationId), equalTo(primaryContext.inSyncLocalCheckpoints().get(allocationId)));
+ }
+
+ // only existing initializing allocation IDs that did not moved to initialized should be tracked
+ assertThat(
+ Sets.difference(initializingAllocationIds, initializedAllocationIds),
+ equalTo(
+ StreamSupport
+ .stream(tracker.trackingLocalCheckpoints.keys().spliterator(), false)
+ .map(e -> e.value)
+ .collect(Collectors.toSet())));
+
+ // the local checkpoints known to the tracker for initializing shards should match what is known in the primary context
+ for (final String allocationId : Sets.difference(initializingAllocationIds, initializedAllocationIds)) {
+ if (primaryContext.trackingLocalCheckpoints().containsKey(allocationId)) {
+ assertThat(
+ tracker.trackingLocalCheckpoints.get(allocationId),
+ equalTo(primaryContext.trackingLocalCheckpoints().get(allocationId)));
+ } else {
+ assertThat(tracker.trackingLocalCheckpoints.get(allocationId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ }
+ }
+
+ // the global checkpoint can only be computed from active allocation IDs and initializing allocation IDs that moved to initializing
+ final long globalCheckpoint =
+ StreamSupport
+ .stream(activeAllocationIdsLocalCheckpoints.spliterator(), false)
+ .filter(e -> tracker.inSyncLocalCheckpoints.containsKey(e.key) || initializedAllocationIds.contains(e.key))
+ .mapToLong(e -> e.value)
+ .min()
+ .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO);
+ assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint));
+ }
+
+ public void testPrimaryContextNewerThanAppliedClusterState() {
+ final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE);
+ final int numberOfActiveAllocationsIds = randomIntBetween(0, 8);
+ final int numberOfInitializingIds = randomIntBetween(0, 8);
+ final Tuple<Set<String>, Set<String>> activeAndInitializingAllocationIds =
+ randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
+ final Set<String> activeAllocationIds = activeAndInitializingAllocationIds.v1();
+ final Set<String> initializingAllocationIds = activeAndInitializingAllocationIds.v2();
+ tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds);
+
+ /*
+ * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the
+ * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the
+ * newer cluster state is a subset of the allocation IDs in the applied cluster state with the caveat that an existing initializing
+ * allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the set of
+ * initializing allocation IDs is otherwise arbitrary.
+ */
+ final int numberOfNewInitializingAllocationIds = randomIntBetween(0, 8);
+ final Set<String> initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds));
+ final Set<String> newInitializingAllocationIds =
+ randomAllocationIdsExcludingExistingIds(
+ Sets.union(activeAllocationIds, initializingAllocationIds), numberOfNewInitializingAllocationIds);
+
+ final ObjectLongMap<String> activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>();
+ for (final String allocationId : Sets.union(new HashSet<>(randomSubsetOf(activeAllocationIds)), initializedAllocationIds)) {
+ activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
+ }
+ final ObjectLongMap<String> initializingIdsLocalCheckpoints = new ObjectLongHashMap<>();
+ final Set<String> contextInitializingAllocationIds = Sets.union(
+ new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))),
+ newInitializingAllocationIds);
+ for (final String allocationId : contextInitializingAllocationIds) {
+ initializingIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
+ }
+
+ final PrimaryContext primaryContext =
+ new PrimaryContext(
+ initialClusterStateVersion + randomIntBetween(0, Integer.MAX_VALUE) + 1,
+ activeAllocationIdsLocalCheckpoints,
+ initializingIdsLocalCheckpoints);
+
+ tracker.updateAllocationIdsFromPrimaryContext(primaryContext);
+
+ final PrimaryContext trackerPrimaryContext = tracker.primaryContext();
+ try {
+ assertTrue(tracker.sealed());
+ final long globalCheckpoint =
+ StreamSupport
+ .stream(activeAllocationIdsLocalCheckpoints.values().spliterator(), false)
+ .mapToLong(e -> e.value)
+ .min()
+ .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO);
+
+ // the primary context contains knowledge of the state of the entire universe
+ assertThat(primaryContext.clusterStateVersion(), equalTo(trackerPrimaryContext.clusterStateVersion()));
+ assertThat(primaryContext.inSyncLocalCheckpoints(), equalTo(trackerPrimaryContext.inSyncLocalCheckpoints()));
+ assertThat(primaryContext.trackingLocalCheckpoints(), equalTo(trackerPrimaryContext.trackingLocalCheckpoints()));
+ assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint));
+ } finally {
+ tracker.releasePrimaryContext();
+ assertFalse(tracker.sealed());
+ }
+ }
+
+ public void testPrimaryContextSealing() {
+ // the tracker should start in the state of not being sealed
+ assertFalse(tracker.sealed());
+
+ // sampling the primary context should seal the tracker
+ tracker.primaryContext();
+ assertTrue(tracker.sealed());
+
+ // invoking any method that mutates the state of the tracker should fail
+ assertIllegalStateExceptionWhenSealed(() -> tracker.updateLocalCheckpoint(randomAlphaOfLength(16), randomNonNegativeLong()));
+ assertIllegalStateExceptionWhenSealed(() -> tracker.updateGlobalCheckpointOnReplica(randomNonNegativeLong()));
+ assertIllegalStateExceptionWhenSealed(
+ () -> tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.emptySet(), Collections.emptySet()));
+ assertIllegalStateExceptionWhenSealed(() -> tracker.updateAllocationIdsFromPrimaryContext(mock(PrimaryContext.class)));
+ assertIllegalStateExceptionWhenSealed(() -> tracker.primaryContext());
+ assertIllegalStateExceptionWhenSealed(() -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(16), randomNonNegativeLong()));
+
+ // closing the releasable should unseal the tracker
+ tracker.releasePrimaryContext();
+ assertFalse(tracker.sealed());
+ }
+
+ private void assertIllegalStateExceptionWhenSealed(final ThrowingRunnable runnable) {
+ final IllegalStateException e = expectThrows(IllegalStateException.class, runnable);
+ assertThat(e, hasToString(containsString("global checkpoint tracker is sealed")));
+ }
+
+ private Tuple<Set<String>, Set<String>> randomActiveAndInitializingAllocationIds(
+ final int numberOfActiveAllocationsIds,
+ final int numberOfInitializingIds) {
+ final Set<String> activeAllocationIds =
+ IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16) + i).collect(Collectors.toSet());
+ final Set<String> initializingIds = randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfInitializingIds);
+ return Tuple.tuple(activeAllocationIds, initializingIds);
+ }
+ private Set<String> randomAllocationIdsExcludingExistingIds(final Set<String> existingAllocationIds, final int numberOfAllocationIds) {
+ return IntStream.range(0, numberOfAllocationIds).mapToObj(i -> {
+ do {
+ final String newAllocationId = randomAlphaOfLength(16);
+ // ensure we do not duplicate an allocation ID
+ if (!existingAllocationIds.contains(newAllocationId)) {
+ return newAllocationId + i;
+ }
+ } while (true);
+ }).collect(Collectors.toSet());
}
private void markAllocationIdAsInSyncQuietly(
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index cc837a0afe..a341c26898 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -536,7 +536,7 @@ public class IndexShardTests extends IndexShardTestCase {
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
indexShard.updateRoutingEntry(routing);
- indexShard.relocated("test");
+ indexShard.relocated("test", primaryContext -> {});
engineClosed = false;
break;
}
@@ -551,7 +551,7 @@ public class IndexShardTests extends IndexShardTestCase {
if (shardRouting.primary() == false) {
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX));
- assertThat(e, hasToString(containsString("shard is not a primary")));
+ assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
}
final long primaryTerm = indexShard.getPrimaryTerm();
@@ -1042,7 +1042,7 @@ public class IndexShardTests extends IndexShardTestCase {
Thread recoveryThread = new Thread(() -> {
latch.countDown();
try {
- shard.relocated("simulated recovery");
+ shard.relocated("simulated recovery", primaryContext -> {});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -1071,7 +1071,7 @@ public class IndexShardTests extends IndexShardTestCase {
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
Thread recoveryThread = new Thread(() -> {
try {
- shard.relocated("simulated recovery");
+ shard.relocated("simulated recovery", primaryContext -> {});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -1124,7 +1124,7 @@ public class IndexShardTests extends IndexShardTestCase {
AtomicBoolean relocated = new AtomicBoolean();
final Thread recoveryThread = new Thread(() -> {
try {
- shard.relocated("simulated recovery");
+ shard.relocated("simulated recovery", primaryContext -> {});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -1158,7 +1158,7 @@ public class IndexShardTests extends IndexShardTestCase {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
- shard.relocated("test");
+ shard.relocated("test", primaryContext -> {});
expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting));
closeShards(shard);
}
@@ -1168,7 +1168,7 @@ public class IndexShardTests extends IndexShardTestCase {
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.updateRoutingEntry(originalRouting);
- expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test"));
+ expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {}));
closeShards(shard);
}
@@ -1187,7 +1187,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
protected void doRun() throws Exception {
cyclicBarrier.await();
- shard.relocated("test");
+ shard.relocated("test", primaryContext -> {});
}
});
relocationThread.start();
@@ -1362,7 +1362,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
shard.updateRoutingEntry(inRecoveryRouting);
- shard.relocated("simulate mark as relocated");
+ shard.relocated("simulate mark as relocated", primaryContext -> {});
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try {
shard.updateRoutingEntry(origRouting);
diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
index d19a51e627..bf0b728674 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
@@ -62,7 +62,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
String allocationId = shard.routingEntry().allocationId().getId();
- shard.updateAllocationIdsFromMaster(Collections.singleton(allocationId), Collections.emptySet());
+ shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet());
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
index 7a53f8f9f5..73e2d7eb0b 100644
--- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
+++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
@@ -322,6 +322,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
* Mock for {@link IndexShard}
*/
protected class MockIndexShard implements IndicesClusterStateService.Shard {
+ private volatile long clusterStateVersion;
private volatile ShardRouting shardRouting;
private volatile RecoveryState recoveryState;
private volatile Set<String> activeAllocationIds;
@@ -372,7 +373,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
}
@Override
- public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
+ public void updateAllocationIdsFromMaster(
+ long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
+ this.clusterStateVersion = applyingClusterStateVersion;
this.activeAllocationIds = activeAllocationIds;
this.initializingAllocationIds = initializingAllocationIds;
}
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index e73bd8a949..5532ad040f 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -35,6 +35,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
@@ -76,6 +77,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -453,8 +455,14 @@ public class RecoverySourceHandlerTests extends ESTestCase {
relocated.set(true);
assertTrue(recoveriesDelayed.get());
return null;
- }).when(shard).relocated(any(String.class));
+ }).when(shard).relocated(any(String.class), any(Consumer.class));
when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
+ doAnswer(invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ final ActionListener<Releasable> listener = (ActionListener<Releasable>)invocationOnMock.getArguments()[0];
+ listener.onResponse(() -> {});
+ return null;
+ }).when(shard).acquirePrimaryOperationPermit(any(ActionListener.class), any(String.class));
// final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class);
// when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef);
diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java
index e1a7a07448..b0d25f43bd 100644
--- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java
@@ -53,7 +53,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllS
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
-@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE")
+@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.index.seqno:TRACE,org.elasticsearch.indices.recovery:TRACE")
public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
private final Logger logger = Loggers.getLogger(RecoveryWhileUnderLoadIT.class);
diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
index fe83847bff..48f6fdeaed 100644
--- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
@@ -514,14 +514,6 @@ public class RelocationIT extends ESIntegTestCase {
// refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down
client().admin().indices().prepareRefresh("test").get();
- /*
- * We have to execute a second refresh as in the face of relocations, the relocation target is not aware of the in-sync set and so
- * the first refresh would bring back the local checkpoint for any shards added to the in-sync set that the relocation target was
- * not tracking.
- */
- // TODO: remove this after a primary context is transferred during relocation handoff
- client().admin().indices().prepareRefresh("test").get();
-
}
class RecoveryCorruption extends MockTransportService.DelegateTransport {