diff options
author | Yannick Welsch <yannick@welsch.lu> | 2017-06-28 10:38:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-28 10:38:22 +0200 |
commit | 8ae61c0fc48294fe8c7a2835edab2e57f30c56db (patch) | |
tree | 4225020ca1e701bc8ea2d6cbaeaad6663d2e2b67 /core/src/main | |
parent | dd6751d3e997208f79a6444984e63b3ea9f42581 (diff) |
Update global checkpoint when increasing primary term on replica (#25422)
When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync.
Relates to #25355, #10708
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java | 5 | ||||
-rw-r--r-- | core/src/main/java/org/elasticsearch/index/shard/IndexShard.java | 31 |
2 files changed, 31 insertions, 5 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 35e4753a9d..b364870e23 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -183,7 +183,7 @@ public abstract class TransportReplicationAction< /** * Synchronously execute the specified replica operation. This is done under a permit from - * {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}. + * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -521,7 +521,6 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Releasable releasable) { try { - replica.updateGlobalCheckpointOnReplica(globalCheckpoint); final ReplicaResult replicaResult = shardOperationOnReplica(request, replica); releasable.close(); // release shard operation lock before responding to caller final TransportReplicationAction.ReplicaResponse response = @@ -596,7 +595,7 @@ public abstract class TransportReplicationAction< throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, actualAllocationId); } - replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor); + replica.acquireReplicaOperationPermit(request.primaryTerm, globalCheckpoint, this, executor); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 13ced02f6b..10fe3ccfd7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2031,29 +2031,47 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * name. * * @param operationPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request * @param onPermitAcquired the listener for permit acquisition * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed */ - public void acquireReplicaOperationPermit( - final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) { + public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint, + final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) { verifyNotClosed(); verifyReplicationTarget(); + final boolean globalCheckpointUpdated; if (operationPrimaryTerm > primaryTerm) { synchronized (primaryTermMutex) { if (operationPrimaryTerm > primaryTerm) { + IndexShardState shardState = state(); + // only roll translog and update primary term if shard has made it past recovery + // Having a new primary term here means that the old primary failed and that there is a new primary, which again + // means that the master will fail this shard as all initializing shards are failed when a primary is selected + // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint + if (shardState != IndexShardState.POST_RECOVERY && + shardState != IndexShardState.STARTED && + shardState != IndexShardState.RELOCATED) { + throw new IndexShardNotStartedException(shardId, shardState); + } try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; + updateGlobalCheckpointOnReplica(globalCheckpoint); getEngine().getTranslog().rollGeneration(); }); + globalCheckpointUpdated = true; } catch (final Exception e) { onPermitAcquired.onFailure(e); return; } + } else { + globalCheckpointUpdated = false; } } + } else { + globalCheckpointUpdated = false; } assert operationPrimaryTerm <= primaryTerm @@ -2072,6 +2090,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl primaryTerm); onPermitAcquired.onFailure(new IllegalStateException(message)); } else { + if (globalCheckpointUpdated == false) { + try { + updateGlobalCheckpointOnReplica(globalCheckpoint); + } catch (Exception e) { + releasable.close(); + onPermitAcquired.onFailure(e); + return; + } + } onPermitAcquired.onResponse(releasable); } } |