summaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-07-05 09:17:16 -0400
committerGitHub <noreply@github.com>2017-07-05 09:17:16 -0400
commit7dcd81b41b668488c968004927832f13e3b52ad2 (patch)
tree44a7392997475b98147dba964e4b0c26d7a2162f /core/src/main
parent7c637a0bfef315b11b3f714dc25645d033aa7632 (diff)
Throw back replica local checkpoint on new primary
This commit causes a replica to throwback its local checkpoint to the global checkpoint when learning of a new primary through a replica operation. Relates #25452
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java13
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java9
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java13
3 files changed, 35 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
index a1ad5dfa6f..9af9f00b1d 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
@@ -122,6 +122,19 @@ public class LocalCheckpointTracker {
}
/**
+ * Resets the checkpoint to the specified value.
+ *
+ * @param checkpoint the local checkpoint to reset this tracker to
+ */
+ synchronized void resetCheckpoint(final long checkpoint) {
+ assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ assert checkpoint <= this.checkpoint;
+ processedSeqNo.clear();
+ firstProcessedSeqNo = checkpoint + 1;
+ this.checkpoint = checkpoint;
+ }
+
+ /**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
* @return the current checkpoint
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
index 6d8b87599a..05f5fea2dc 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
@@ -107,6 +107,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
}
/**
+ * Resets the local checkpoint to the specified value.
+ *
+ * @param localCheckpoint the local checkpoint to reset to
+ */
+ public void resetLocalCheckpoint(final long localCheckpoint) {
+ localCheckpointTracker.resetCheckpoint(localCheckpoint);
+ }
+
+ /**
* The current sequence number stats.
*
* @return stats encapsulating the maximum sequence number, the local checkpoint and the global checkpoint
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index db0f27a28c..021f37d457 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -2058,6 +2058,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
+ final long currentGlobalCheckpoint = getGlobalCheckpoint();
+ final long localCheckpoint;
+ if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
+ } else {
+ localCheckpoint = currentGlobalCheckpoint;
+ }
+ logger.trace(
+ "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
+ operationPrimaryTerm,
+ getLocalCheckpoint(),
+ localCheckpoint);
+ getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;