summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-07-05 09:17:16 -0400
committerGitHub <noreply@github.com>2017-07-05 09:17:16 -0400
commit7dcd81b41b668488c968004927832f13e3b52ad2 (patch)
tree44a7392997475b98147dba964e4b0c26d7a2162f
parent7c637a0bfef315b11b3f714dc25645d033aa7632 (diff)
Throw back replica local checkpoint on new primary
This commit causes a replica to throwback its local checkpoint to the global checkpoint when learning of a new primary through a replica operation. Relates #25452
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java13
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java9
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java13
-rw-r--r--core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java20
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java131
5 files changed, 166 insertions, 20 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
index a1ad5dfa6f..9af9f00b1d 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
@@ -122,6 +122,19 @@ public class LocalCheckpointTracker {
}
/**
+ * Resets the checkpoint to the specified value.
+ *
+ * @param checkpoint the local checkpoint to reset this tracker to
+ */
+ synchronized void resetCheckpoint(final long checkpoint) {
+ assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ assert checkpoint <= this.checkpoint;
+ processedSeqNo.clear();
+ firstProcessedSeqNo = checkpoint + 1;
+ this.checkpoint = checkpoint;
+ }
+
+ /**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
* @return the current checkpoint
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
index 6d8b87599a..05f5fea2dc 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
@@ -107,6 +107,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
}
/**
+ * Resets the local checkpoint to the specified value.
+ *
+ * @param localCheckpoint the local checkpoint to reset to
+ */
+ public void resetLocalCheckpoint(final long localCheckpoint) {
+ localCheckpointTracker.resetCheckpoint(localCheckpoint);
+ }
+
+ /**
* The current sequence number stats.
*
* @return stats encapsulating the maximum sequence number, the local checkpoint and the global checkpoint
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index db0f27a28c..021f37d457 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -2058,6 +2058,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
+ final long currentGlobalCheckpoint = getGlobalCheckpoint();
+ final long localCheckpoint;
+ if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
+ } else {
+ localCheckpoint = currentGlobalCheckpoint;
+ }
+ logger.trace(
+ "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
+ operationPrimaryTerm,
+ getLocalCheckpoint(),
+ localCheckpoint);
+ getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;
diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java
index 3d280b4d28..e2978ffc51 100644
--- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java
+++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
@@ -236,4 +237,23 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
thread.join();
}
+
+ public void testResetCheckpoint() {
+ final int operations = 1024 - scaledRandomIntBetween(0, 1024);
+ int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
+ for (int i = 0; i < operations; i++) {
+ if (!rarely()) {
+ tracker.markSeqNoAsCompleted(i);
+ maxSeqNo = i;
+ }
+ }
+
+ final int localCheckpoint =
+ randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
+ tracker.resetCheckpoint(localCheckpoint);
+ assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
+ assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
+ assertThat(tracker.processedSeqNo, empty());
+ assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 9093274a49..a838681c90 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
@@ -142,7 +143,6 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
/**
@@ -405,26 +405,10 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
- int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
- boolean gap = false;
- for (int i = 0; i < operations; i++) {
- if (!rarely()) {
- final String id = Integer.toString(i);
- SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
- new BytesArray("{}"), XContentType.JSON);
- indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
- 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
- getMappingUpdater(indexShard, sourceToParse.type()));
- max = i;
- } else {
- gap = true;
- }
- }
+ final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
- final int maxSeqNo = max;
- if (gap) {
- assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
- }
+ final int maxSeqNo = result.maxSeqNo;
+ final boolean gap = result.gap;
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
@@ -626,6 +610,12 @@ public class IndexShardTests extends IndexShardTestCase {
}
newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
}
+ final long expectedLocalCheckpoint;
+ if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
+ } else {
+ expectedLocalCheckpoint = newGlobalCheckPoint;
+ }
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
@@ -637,6 +627,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
@@ -697,6 +688,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
@@ -707,6 +699,56 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
+ public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
+ final IndexShard indexShard = newStartedShard(false);
+
+ // most of the time this is large enough that most of the time there will be at least one gap
+ final int operations = 1024 - scaledRandomIntBetween(0, 1024);
+ indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
+
+ final long globalCheckpointOnReplica =
+ randomIntBetween(
+ Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
+ Math.toIntExact(indexShard.getLocalCheckpoint()));
+ indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);
+
+ final int globalCheckpoint =
+ randomIntBetween(
+ Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
+ Math.toIntExact(indexShard.getLocalCheckpoint()));
+ final CountDownLatch latch = new CountDownLatch(1);
+ indexShard.acquireReplicaOperationPermit(
+ indexShard.primaryTerm + 1,
+ globalCheckpoint,
+ new ActionListener<Releasable>() {
+ @Override
+ public void onResponse(final Releasable releasable) {
+ releasable.close();
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(final Exception e) {
+
+ }
+ },
+ ThreadPool.Names.SAME);
+
+ latch.await();
+ if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO
+ && globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
+ } else {
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
+ }
+
+ // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
+ final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
+ assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
+
+ closeShards(indexShard);
+ }
+
public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);
@@ -1966,6 +2008,55 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(newShard);
}
+ class Result {
+ private final int localCheckpoint;
+ private final int maxSeqNo;
+ private final boolean gap;
+
+ Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
+ this.localCheckpoint = localCheckpoint;
+ this.maxSeqNo = maxSeqNo;
+ this.gap = gap;
+ }
+ }
+
+ /**
+ * Index on the specified shard while introducing sequence number gaps.
+ *
+ * @param indexShard the shard
+ * @param operations the number of operations
+ * @param offset the starting sequence number
+ * @return a pair of the maximum sequence number and whether or not a gap was introduced
+ * @throws IOException if an I/O exception occurs while indexing on the shard
+ */
+ private Result indexOnReplicaWithGaps(
+ final IndexShard indexShard,
+ final int operations,
+ final int offset) throws IOException {
+ int localCheckpoint = offset;
+ int max = offset;
+ boolean gap = false;
+ for (int i = offset + 1; i < operations; i++) {
+ if (!rarely()) {
+ final String id = Integer.toString(i);
+ SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
+ new BytesArray("{}"), XContentType.JSON);
+ indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
+ 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
+ getMappingUpdater(indexShard, sourceToParse.type()));
+ if (!gap && i == localCheckpoint + 1) {
+ localCheckpoint++;
+ }
+ max = i;
+ } else {
+ gap = true;
+ }
+ }
+ assert localCheckpoint == indexShard.getLocalCheckpoint();
+ assert !gap || (localCheckpoint != max);
+ return new Result(localCheckpoint, max, gap);
+ }
+
/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;