summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-06-30 10:59:03 -0400
committerGitHub <noreply@github.com>2017-06-30 10:59:03 -0400
commitdd93ef3f24e91ebf8b8610beea0350ae04b53ced (patch)
treef07196bd34ec3bbb10d18a7a5752d138d50af3e2
parentc8da7f84a20b836671048749245dba3f2c95dbeb (diff)
Add additional test for sequence-number recovery
This commit adds a test for a scenario where a replica receives an extra document that the promoted replica does not receive, misses the primary/replica re-sync, and the recovers from the newly-promoted primary. Relates #25493
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java1
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java63
2 files changed, 64 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 429be167e7..dcb6f5759d 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -356,6 +356,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
+ assert seqNoStats.getLocalCheckpoint() <= seqNoStats.getGlobalCheckpoint();
/*
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index 9e030f68a3..87608dedd1 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -29,11 +29,15 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
+import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.store.Store;
@@ -147,6 +151,65 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
+ /*
+ * Simulate a scenario with two replicas where one of the replicas receives an extra document, the other replica is promoted on primary
+ * failure, the receiving replica misses the primary/replica re-sync and then recovers from the primary. We expect that a
+ * sequence-number based recovery is performed and the extra document does not remain after recovery.
+ */
+ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
+ try (ReplicationGroup shards = createGroup(2)) {
+ shards.startAll();
+ final int docs = randomIntBetween(0, 16);
+ for (int i = 0; i < docs; i++) {
+ shards.index(
+ new IndexRequest("index", "type", Integer.toString(i)).source("{}", XContentType.JSON));
+ }
+
+ shards.flush();
+ shards.syncGlobalCheckpoint();
+
+ final IndexShard oldPrimary = shards.getPrimary();
+ final IndexShard promotedReplica = shards.getReplicas().get(0);
+ final IndexShard remainingReplica = shards.getReplicas().get(1);
+ // slip the extra document into the replica
+ remainingReplica.applyIndexOperationOnReplica(
+ remainingReplica.getLocalCheckpoint() + 1,
+ remainingReplica.getPrimaryTerm(),
+ 1,
+ VersionType.EXTERNAL,
+ randomNonNegativeLong(),
+ false,
+ SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON),
+ mapping -> {});
+ shards.promoteReplicaToPrimary(promotedReplica);
+ oldPrimary.close("demoted", randomBoolean());
+ oldPrimary.store().close();
+ shards.removeReplica(remainingReplica);
+ remainingReplica.close("disconnected", false);
+ remainingReplica.store().close();
+ // randomly introduce a conflicting document
+ final boolean extra = randomBoolean();
+ if (extra) {
+ promotedReplica.applyIndexOperationOnPrimary(
+ Versions.MATCH_ANY,
+ VersionType.INTERNAL,
+ SourceToParse.source("index", "type", "primary", new BytesArray("{}"), XContentType.JSON),
+ randomNonNegativeLong(),
+ false,
+ mapping -> {
+ });
+ }
+ final IndexShard recoveredReplica =
+ shards.addReplicaWithExistingPath(remainingReplica.shardPath(), remainingReplica.routingEntry().currentNodeId());
+ shards.recoverReplica(recoveredReplica);
+
+ assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
+ assertThat(recoveredReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(extra ? 1 : 0));
+
+ shards.assertAllEqual(docs + (extra ? 1 : 0));
+ }
+ }
+
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
public void testRecoveryAfterPrimaryPromotion() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {