summaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java20
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java131
2 files changed, 131 insertions, 20 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java
index 3d280b4d28..e2978ffc51 100644
--- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java
+++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
@@ -236,4 +237,23 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
thread.join();
}
+
+ public void testResetCheckpoint() {
+ final int operations = 1024 - scaledRandomIntBetween(0, 1024);
+ int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
+ for (int i = 0; i < operations; i++) {
+ if (!rarely()) {
+ tracker.markSeqNoAsCompleted(i);
+ maxSeqNo = i;
+ }
+ }
+
+ final int localCheckpoint =
+ randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
+ tracker.resetCheckpoint(localCheckpoint);
+ assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
+ assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
+ assertThat(tracker.processedSeqNo, empty());
+ assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 9093274a49..a838681c90 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
@@ -142,7 +143,6 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
/**
@@ -405,26 +405,10 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
- int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
- boolean gap = false;
- for (int i = 0; i < operations; i++) {
- if (!rarely()) {
- final String id = Integer.toString(i);
- SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
- new BytesArray("{}"), XContentType.JSON);
- indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
- 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
- getMappingUpdater(indexShard, sourceToParse.type()));
- max = i;
- } else {
- gap = true;
- }
- }
+ final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
- final int maxSeqNo = max;
- if (gap) {
- assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
- }
+ final int maxSeqNo = result.maxSeqNo;
+ final boolean gap = result.gap;
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
@@ -626,6 +610,12 @@ public class IndexShardTests extends IndexShardTestCase {
}
newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
}
+ final long expectedLocalCheckpoint;
+ if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
+ } else {
+ expectedLocalCheckpoint = newGlobalCheckPoint;
+ }
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
@@ -637,6 +627,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
@@ -697,6 +688,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
@@ -707,6 +699,56 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
+ public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
+ final IndexShard indexShard = newStartedShard(false);
+
+ // most of the time this is large enough that most of the time there will be at least one gap
+ final int operations = 1024 - scaledRandomIntBetween(0, 1024);
+ indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
+
+ final long globalCheckpointOnReplica =
+ randomIntBetween(
+ Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
+ Math.toIntExact(indexShard.getLocalCheckpoint()));
+ indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);
+
+ final int globalCheckpoint =
+ randomIntBetween(
+ Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
+ Math.toIntExact(indexShard.getLocalCheckpoint()));
+ final CountDownLatch latch = new CountDownLatch(1);
+ indexShard.acquireReplicaOperationPermit(
+ indexShard.primaryTerm + 1,
+ globalCheckpoint,
+ new ActionListener<Releasable>() {
+ @Override
+ public void onResponse(final Releasable releasable) {
+ releasable.close();
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(final Exception e) {
+
+ }
+ },
+ ThreadPool.Names.SAME);
+
+ latch.await();
+ if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO
+ && globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
+ } else {
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
+ }
+
+ // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
+ final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
+ assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
+
+ closeShards(indexShard);
+ }
+
public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);
@@ -1966,6 +2008,55 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(newShard);
}
+ class Result {
+ private final int localCheckpoint;
+ private final int maxSeqNo;
+ private final boolean gap;
+
+ Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
+ this.localCheckpoint = localCheckpoint;
+ this.maxSeqNo = maxSeqNo;
+ this.gap = gap;
+ }
+ }
+
+ /**
+ * Index on the specified shard while introducing sequence number gaps.
+ *
+ * @param indexShard the shard
+ * @param operations the number of operations
+ * @param offset the starting sequence number
+ * @return a pair of the maximum sequence number and whether or not a gap was introduced
+ * @throws IOException if an I/O exception occurs while indexing on the shard
+ */
+ private Result indexOnReplicaWithGaps(
+ final IndexShard indexShard,
+ final int operations,
+ final int offset) throws IOException {
+ int localCheckpoint = offset;
+ int max = offset;
+ boolean gap = false;
+ for (int i = offset + 1; i < operations; i++) {
+ if (!rarely()) {
+ final String id = Integer.toString(i);
+ SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
+ new BytesArray("{}"), XContentType.JSON);
+ indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
+ 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
+ getMappingUpdater(indexShard, sourceToParse.type()));
+ if (!gap && i == localCheckpoint + 1) {
+ localCheckpoint++;
+ }
+ max = i;
+ } else {
+ gap = true;
+ }
+ }
+ assert localCheckpoint == indexShard.getLocalCheckpoint();
+ assert !gap || (localCheckpoint != max);
+ return new Result(localCheckpoint, max, gap);
+ }
+
/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;