diff options
Diffstat (limited to 'core/src/test/java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java | 20 | ||||
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java | 131 |
2 files changed, 131 insertions, 20 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 3d280b4d28..e2978ffc51 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -236,4 +237,23 @@ public class LocalCheckpointTrackerTests extends ESTestCase { thread.join(); } + + public void testResetCheckpoint() { + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); + for (int i = 0; i < operations; i++) { + if (!rarely()) { + tracker.markSeqNoAsCompleted(i); + maxSeqNo = i; + } + } + + final int localCheckpoint = + randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint())); + tracker.resetCheckpoint(localCheckpoint); + assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); + assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); + assertThat(tracker.processedSeqNo, empty()); + assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9093274a49..a838681c90 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -142,7 +143,6 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; /** @@ -405,26 +405,10 @@ public class IndexShardTests extends IndexShardTestCase { // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); - boolean gap = false; - for (int i = 0; i < operations; i++) { - if (!rarely()) { - final String id = Integer.toString(i); - SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id, - new BytesArray("{}"), XContentType.JSON); - indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(), - 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, - getMappingUpdater(indexShard, sourceToParse.type())); - max = i; - } else { - gap = true; - } - } + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); - final int maxSeqNo = max; - if (gap) { - assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo))); - } + final int maxSeqNo = result.maxSeqNo; + final boolean gap = result.gap; // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); @@ -626,6 +610,12 @@ public class IndexShardTests extends IndexShardTestCase { } newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint); } + final long expectedLocalCheckpoint; + if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } else { + expectedLocalCheckpoint = newGlobalCheckPoint; + } // but you can not increment with a new primary term until the operations on the older primary term complete final Thread thread = new Thread(() -> { try { @@ -637,6 +627,7 @@ public class IndexShardTests extends IndexShardTestCase { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); releasable.close(); @@ -697,6 +688,7 @@ public class IndexShardTests extends IndexShardTestCase { assertTrue(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } } @@ -707,6 +699,56 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } + public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + + // most of the time this is large enough that most of the time there will be at least one gap + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); + + final long globalCheckpointOnReplica = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), + Math.toIntExact(indexShard.getLocalCheckpoint())); + indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica); + + final int globalCheckpoint = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), + Math.toIntExact(indexShard.getLocalCheckpoint())); + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquireReplicaOperationPermit( + indexShard.primaryTerm + 1, + globalCheckpoint, + new ActionListener<Releasable>() { + @Override + public void onResponse(final Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + + } + }, + ThreadPool.Names.SAME); + + latch.await(); + if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO + && globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + } else { + assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); + } + + // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); + + closeShards(indexShard); + } + public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { final IndexShard indexShard = newStartedShard(false); @@ -1966,6 +2008,55 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(newShard); } + class Result { + private final int localCheckpoint; + private final int maxSeqNo; + private final boolean gap; + + Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) { + this.localCheckpoint = localCheckpoint; + this.maxSeqNo = maxSeqNo; + this.gap = gap; + } + } + + /** + * Index on the specified shard while introducing sequence number gaps. + * + * @param indexShard the shard + * @param operations the number of operations + * @param offset the starting sequence number + * @return a pair of the maximum sequence number and whether or not a gap was introduced + * @throws IOException if an I/O exception occurs while indexing on the shard + */ + private Result indexOnReplicaWithGaps( + final IndexShard indexShard, + final int operations, + final int offset) throws IOException { + int localCheckpoint = offset; + int max = offset; + boolean gap = false; + for (int i = offset + 1; i < operations; i++) { + if (!rarely()) { + final String id = Integer.toString(i); + SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id, + new BytesArray("{}"), XContentType.JSON); + indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(), + 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, + getMappingUpdater(indexShard, sourceToParse.type())); + if (!gap && i == localCheckpoint + 1) { + localCheckpoint++; + } + max = i; + } else { + gap = true; + } + } + assert localCheckpoint == indexShard.getLocalCheckpoint(); + assert !gap || (localCheckpoint != max); + return new Result(localCheckpoint, max, gap); + } + /** A dummy repository for testing which just needs restore overridden */ private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { private final String indexName; |