summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-28 10:38:22 +0200
committerGitHub <noreply@github.com>2017-06-28 10:38:22 +0200
commit8ae61c0fc48294fe8c7a2835edab2e57f30c56db (patch)
tree4225020ca1e701bc8ea2d6cbaeaad6663d2e2b67 /core/src/main/java/org/elasticsearch
parentdd6751d3e997208f79a6444984e63b3ea9f42581 (diff)
Update global checkpoint when increasing primary term on replica (#25422)
When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync. Relates to #25355, #10708
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java5
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java31
2 files changed, 31 insertions, 5 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 35e4753a9d..b364870e23 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -183,7 +183,7 @@ public abstract class TransportReplicationAction<
/**
* Synchronously execute the specified replica operation. This is done under a permit from
- * {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}.
+ * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
@@ -521,7 +521,6 @@ public abstract class TransportReplicationAction<
@Override
public void onResponse(Releasable releasable) {
try {
- replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
@@ -596,7 +595,7 @@ public abstract class TransportReplicationAction<
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
- replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
+ replica.acquireReplicaOperationPermit(request.primaryTerm, globalCheckpoint, this, executor);
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 13ced02f6b..10fe3ccfd7 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -2031,29 +2031,47 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* name.
*
* @param operationPrimaryTerm the operation primary term
+ * @param globalCheckpoint the global checkpoint associated with the request
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
*/
- public void acquireReplicaOperationPermit(
- final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
+ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
+ final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
verifyNotClosed();
verifyReplicationTarget();
+ final boolean globalCheckpointUpdated;
if (operationPrimaryTerm > primaryTerm) {
synchronized (primaryTermMutex) {
if (operationPrimaryTerm > primaryTerm) {
+ IndexShardState shardState = state();
+ // only roll translog and update primary term if shard has made it past recovery
+ // Having a new primary term here means that the old primary failed and that there is a new primary, which again
+ // means that the master will fail this shard as all initializing shards are failed when a primary is selected
+ // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
+ if (shardState != IndexShardState.POST_RECOVERY &&
+ shardState != IndexShardState.STARTED &&
+ shardState != IndexShardState.RELOCATED) {
+ throw new IndexShardNotStartedException(shardId, shardState);
+ }
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm :
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
+ updateGlobalCheckpointOnReplica(globalCheckpoint);
getEngine().getTranslog().rollGeneration();
});
+ globalCheckpointUpdated = true;
} catch (final Exception e) {
onPermitAcquired.onFailure(e);
return;
}
+ } else {
+ globalCheckpointUpdated = false;
}
}
+ } else {
+ globalCheckpointUpdated = false;
}
assert operationPrimaryTerm <= primaryTerm
@@ -2072,6 +2090,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
+ if (globalCheckpointUpdated == false) {
+ try {
+ updateGlobalCheckpointOnReplica(globalCheckpoint);
+ } catch (Exception e) {
+ releasable.close();
+ onPermitAcquired.onFailure(e);
+ return;
+ }
+ }
onPermitAcquired.onResponse(releasable);
}
}