diff options
author | Jason Tedor <jason@tedor.me> | 2017-06-30 10:59:03 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-30 10:59:03 -0400 |
commit | dd93ef3f24e91ebf8b8610beea0350ae04b53ced (patch) | |
tree | f07196bd34ec3bbb10d18a7a5752d138d50af3e2 | |
parent | c8da7f84a20b836671048749245dba3f2c95dbeb (diff) |
Add additional test for sequence-number recovery
This commit adds a test for a scenario where a replica receives an extra
document that the promoted replica does not receive, misses the
primary/replica re-sync, and the recovers from the newly-promoted
primary.
Relates #25493
-rw-r--r-- | core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | 1 | ||||
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java | 63 |
2 files changed, 64 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 429be167e7..dcb6f5759d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -356,6 +356,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation()); final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint); if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) { + assert seqNoStats.getLocalCheckpoint() <= seqNoStats.getGlobalCheckpoint(); /* * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 9e030f68a3..87608dedd1 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -29,11 +29,15 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.store.Store; @@ -147,6 +151,65 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } } + /* + * Simulate a scenario with two replicas where one of the replicas receives an extra document, the other replica is promoted on primary + * failure, the receiving replica misses the primary/replica re-sync and then recovers from the primary. We expect that a + * sequence-number based recovery is performed and the extra document does not remain after recovery. + */ + public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + final int docs = randomIntBetween(0, 16); + for (int i = 0; i < docs; i++) { + shards.index( + new IndexRequest("index", "type", Integer.toString(i)).source("{}", XContentType.JSON)); + } + + shards.flush(); + shards.syncGlobalCheckpoint(); + + final IndexShard oldPrimary = shards.getPrimary(); + final IndexShard promotedReplica = shards.getReplicas().get(0); + final IndexShard remainingReplica = shards.getReplicas().get(1); + // slip the extra document into the replica + remainingReplica.applyIndexOperationOnReplica( + remainingReplica.getLocalCheckpoint() + 1, + remainingReplica.getPrimaryTerm(), + 1, + VersionType.EXTERNAL, + randomNonNegativeLong(), + false, + SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON), + mapping -> {}); + shards.promoteReplicaToPrimary(promotedReplica); + oldPrimary.close("demoted", randomBoolean()); + oldPrimary.store().close(); + shards.removeReplica(remainingReplica); + remainingReplica.close("disconnected", false); + remainingReplica.store().close(); + // randomly introduce a conflicting document + final boolean extra = randomBoolean(); + if (extra) { + promotedReplica.applyIndexOperationOnPrimary( + Versions.MATCH_ANY, + VersionType.INTERNAL, + SourceToParse.source("index", "type", "primary", new BytesArray("{}"), XContentType.JSON), + randomNonNegativeLong(), + false, + mapping -> { + }); + } + final IndexShard recoveredReplica = + shards.addReplicaWithExistingPath(remainingReplica.shardPath(), remainingReplica.routingEntry().currentNodeId()); + shards.recoverReplica(recoveredReplica); + + assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty()); + assertThat(recoveredReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(extra ? 1 : 0)); + + shards.assertAllEqual(docs + (extra ? 1 : 0)); + } + } + @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE") public void testRecoveryAfterPrimaryPromotion() throws Exception { try (ReplicationGroup shards = createGroup(2)) { |