diff options
author | Jason Tedor <jason@tedor.me> | 2017-05-19 16:17:22 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-19 16:17:22 -0400 |
commit | 4cd70cf9865ae15e09f0ed7170c05ea29e5356b1 (patch) | |
tree | da116ee8cab8792f583ce11c6df6d831f2699810 /core/src/test/java/org/elasticsearch/action/support | |
parent | cb7a8d5876ce133dd2e6418378f305fa40671ca5 (diff) |
Block older operations on primary term transition
Today a replica learns of a new primary term via a cluster state update
and there is not a clean transition between the older primary term and
the newer primary term. This commit modifies this situation so that:
- a replica shard learns of a new primary term via replication
operations executed under the mandate of the new primary
- when a replica shard learns of a new primary term, it blocks
operations on older terms from reaching the engine, with a clear
transition point between the operations on the older term and the
operations on the newer term
This work paves the way for a primary/replica sync on primary
promotion. Future work will also ensure a clean transition point on a
promoted primary, and prepare a replica shard for a sync with the
promoted primary.
Relates #24779
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
2 files changed, 4 insertions, 4 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0f8071cce3..89026d9d1d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1091,7 +1091,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1]; @@ -1103,7 +1103,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index ca16e816b4..f0690ad67b 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -444,7 +444,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1]; @@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); |