From 8ae61c0fc48294fe8c7a2835edab2e57f30c56db Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 28 Jun 2017 10:38:22 +0200 Subject: Update global checkpoint when increasing primary term on replica (#25422) When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync. Relates to #25355, #10708 --- .../TransportReplicationActionTests.java | 4 +- .../replication/TransportWriteActionTests.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../elasticsearch/index/shard/IndexShardTests.java | 149 ++++++++++++++------- 4 files changed, 104 insertions(+), 53 deletions(-) (limited to 'core/src/test') diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index f91fab381d..a4a34b7002 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1161,7 +1161,7 @@ public class TransportReplicationActionTests extends ESTestCase { }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; - ActionListener callback = (ActionListener) invocation.getArguments()[1]; + ActionListener callback = (ActionListener) invocation.getArguments()[2]; final long primaryTerm = indexShard.getPrimaryTerm(); if (term < primaryTerm) { throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])", @@ -1170,7 +1170,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index f0690ad67b..7e1ff9e1ca 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7962f23caf..b74596cda6 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -518,11 +518,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); replica.acquireReplicaOperationPermit( request.primaryTerm(), + globalCheckpoint, new ActionListener() { @Override public void onResponse(Releasable releasable) { try { - replica.updateGlobalCheckpointOnReplica(globalCheckpoint); performOnReplica(request, replica); releasable.close(); listener.onResponse( diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5b068b2377..16baf57fd7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -275,13 +275,22 @@ public class IndexShardTests extends IndexShardTestCase { // expected } try { - indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.UNASSIGNED_SEQ_NO, null, + ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } + public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException { + IndexShard indexShard = newShard(false); + expectThrows(IndexShardNotStartedException.class, () -> + indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), + SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX)); + closeShards(indexShard); + } + public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { final IndexShard indexShard = newStartedShard(false); @@ -299,6 +308,7 @@ public class IndexShardTests extends IndexShardTestCase { } indexShard.acquireReplicaOperationPermit( indexShard.getPrimaryTerm(), + indexShard.getGlobalCheckpoint(), new ActionListener() { @Override public void onResponse(Releasable releasable) { @@ -477,7 +487,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { @@ -503,11 +513,11 @@ public class IndexShardTests extends IndexShardTestCase { private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX); return fut.get(); } - public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException { + public void testOperationPermitOnReplicaShards() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; final boolean engineClosed; @@ -557,10 +567,17 @@ public class IndexShardTests extends IndexShardTestCase { final long primaryTerm = indexShard.getPrimaryTerm(); final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration; - final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); - assertEquals(1, indexShard.getActiveOperationsCount()); - final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); - assertEquals(2, indexShard.getActiveOperationsCount()); + final Releasable operation1; + final Releasable operation2; + if (engineClosed == false) { + operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); + assertEquals(1, indexShard.getActiveOperationsCount()); + operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); + assertEquals(2, indexShard.getActiveOperationsCount()); + } else { + operation1 = null; + operation2 = null; + } { final AtomicBoolean onResponse = new AtomicBoolean(); @@ -579,7 +596,8 @@ public class IndexShardTests extends IndexShardTestCase { } }; - indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, onLockAcquired, + ThreadPool.Names.INDEX); assertFalse(onResponse.get()); assertTrue(onFailure.get()); @@ -593,6 +611,21 @@ public class IndexShardTests extends IndexShardTestCase { final AtomicReference onFailure = new AtomicReference<>(); final CyclicBarrier barrier = new CyclicBarrier(2); final long newPrimaryTerm = primaryTerm + 1 + randomInt(20); + if (engineClosed == false) { + assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + final long newGlobalCheckPoint; + if (engineClosed || randomBoolean()) { + newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } else { + long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100); + // advance local checkpoint + for (int i = 0; i <= localCheckPoint; i++) { + indexShard.markSeqNoAsNoop(i, indexShard.getPrimaryTerm(), "dummy doc"); + } + newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint); + } // but you can not increment with a new primary term until the operations on the older primary term complete final Thread thread = new Thread(() -> { try { @@ -600,55 +633,72 @@ public class IndexShardTests extends IndexShardTestCase { } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } - indexShard.acquireReplicaOperationPermit( - newPrimaryTerm, - new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - onResponse.set(true); - releasable.close(); - finish(); - } + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); + onResponse.set(true); + releasable.close(); + finish(); + } - @Override - public void onFailure(Exception e) { - onFailure.set(e); - finish(); - } + @Override + public void onFailure(Exception e) { + onFailure.set(e); + finish(); + } - private void finish() { - try { - barrier.await(); - } catch (final BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - } - }, + private void finish() { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + try { + indexShard.acquireReplicaOperationPermit( + newPrimaryTerm, + newGlobalCheckPoint, + listener, ThreadPool.Names.SAME); + } catch (Exception e) { + listener.onFailure(e); + } }); thread.start(); barrier.await(); - // our operation should be blocked until the previous operations complete - assertFalse(onResponse.get()); - assertNull(onFailure.get()); - assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); - Releasables.close(operation1); - // our operation should still be blocked - assertFalse(onResponse.get()); - assertNull(onFailure.get()); - assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); - Releasables.close(operation2); - barrier.await(); - // now lock acquisition should have succeeded - assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - if (engineClosed) { + if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) { + barrier.await(); + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); assertFalse(onResponse.get()); - assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class)); + assertThat(onFailure.get(), instanceOf(IndexShardNotStartedException.class)); + Releasables.close(operation1); + Releasables.close(operation2); } else { - assertTrue(onResponse.get()); + // our operation should be blocked until the previous operations complete + assertFalse(onResponse.get()); assertNull(onFailure.get()); - assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); + Releasables.close(operation1); + // our operation should still be blocked + assertFalse(onResponse.get()); + assertNull(onFailure.get()); + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); + Releasables.close(operation2); + barrier.await(); + // now lock acquisition should have succeeded + assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + if (engineClosed) { + assertFalse(onResponse.get()); + assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class)); + } else { + assertTrue(onResponse.get()); + assertNull(onFailure.get()); + assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); + } } thread.join(); assertEquals(0, indexShard.getActiveOperationsCount()); @@ -676,6 +726,7 @@ public class IndexShardTests extends IndexShardTestCase { } indexShard.acquireReplicaOperationPermit( primaryTerm + increment, + indexShard.getGlobalCheckpoint(), new ActionListener() { @Override public void onResponse(Releasable releasable) { -- cgit v1.2.3