diff options
author | Jason Tedor <jason@tedor.me> | 2017-07-05 09:17:16 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-05 09:17:16 -0400 |
commit | 7dcd81b41b668488c968004927832f13e3b52ad2 (patch) | |
tree | 44a7392997475b98147dba964e4b0c26d7a2162f /core/src/main | |
parent | 7c637a0bfef315b11b3f714dc25645d033aa7632 (diff) |
Throw back replica local checkpoint on new primary
This commit causes a replica to throwback its local checkpoint to the
global checkpoint when learning of a new primary through a replica
operation.
Relates #25452
Diffstat (limited to 'core/src/main')
3 files changed, 35 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index a1ad5dfa6f..9af9f00b1d 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -122,6 +122,19 @@ public class LocalCheckpointTracker { } /** + * Resets the checkpoint to the specified value. + * + * @param checkpoint the local checkpoint to reset this tracker to + */ + synchronized void resetCheckpoint(final long checkpoint) { + assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; + assert checkpoint <= this.checkpoint; + processedSeqNo.clear(); + firstProcessedSeqNo = checkpoint + 1; + this.checkpoint = checkpoint; + } + + /** * The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}. * * @return the current checkpoint diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 6d8b87599a..05f5fea2dc 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -107,6 +107,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { } /** + * Resets the local checkpoint to the specified value. + * + * @param localCheckpoint the local checkpoint to reset to + */ + public void resetLocalCheckpoint(final long localCheckpoint) { + localCheckpointTracker.resetCheckpoint(localCheckpoint); + } + + /** * The current sequence number stats. * * @return stats encapsulating the maximum sequence number, the local checkpoint and the global checkpoint diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index db0f27a28c..021f37d457 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2058,6 +2058,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; updateGlobalCheckpointOnReplica(globalCheckpoint); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long localCheckpoint; + if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } else { + localCheckpoint = currentGlobalCheckpoint; + } + logger.trace( + "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", + operationPrimaryTerm, + getLocalCheckpoint(), + localCheckpoint); + getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint); getEngine().getTranslog().rollGeneration(); }); globalCheckpointUpdated = true; |