summaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-28 10:38:22 +0200
committerGitHub <noreply@github.com>2017-06-28 10:38:22 +0200
commit8ae61c0fc48294fe8c7a2835edab2e57f30c56db (patch)
tree4225020ca1e701bc8ea2d6cbaeaad6663d2e2b67 /core/src/test
parentdd6751d3e997208f79a6444984e63b3ea9f42581 (diff)
Update global checkpoint when increasing primary term on replica (#25422)
When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync. Relates to #25355, #10708
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java4
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java149
4 files changed, 104 insertions, 53 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
index f91fab381d..a4a34b7002 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
@@ -1161,7 +1161,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
- ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
+ ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
final long primaryTerm = indexShard.getPrimaryTerm();
if (term < primaryTerm) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
@@ -1170,7 +1170,7 @@ public class TransportReplicationActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
- }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
+ }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index f0690ad67b..7e1ff9e1ca 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
- }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
+ }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index 7962f23caf..b74596cda6 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -518,11 +518,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
replica.acquireReplicaOperationPermit(
request.primaryTerm(),
+ globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
try {
- replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
performOnReplica(request, replica);
releasable.close();
listener.onResponse(
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 5b068b2377..16baf57fd7 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -275,13 +275,22 @@ public class IndexShardTests extends IndexShardTestCase {
// expected
}
try {
- indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.UNASSIGNED_SEQ_NO, null,
+ ThreadPool.Names.INDEX);
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
}
+ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException {
+ IndexShard indexShard = newShard(false);
+ expectThrows(IndexShardNotStartedException.class, () ->
+ indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
+ SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX));
+ closeShards(indexShard);
+ }
+
public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
@@ -299,6 +308,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
indexShard.acquireReplicaOperationPermit(
indexShard.getPrimaryTerm(),
+ indexShard.getGlobalCheckpoint(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
@@ -477,7 +487,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) {
try {
- indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX);
fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) {
@@ -503,11 +513,11 @@ public class IndexShardTests extends IndexShardTestCase {
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
- indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX);
return fut.get();
}
- public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
+ public void testOperationPermitOnReplicaShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
final boolean engineClosed;
@@ -557,10 +567,17 @@ public class IndexShardTests extends IndexShardTestCase {
final long primaryTerm = indexShard.getPrimaryTerm();
final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration;
- final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
- assertEquals(1, indexShard.getActiveOperationsCount());
- final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
- assertEquals(2, indexShard.getActiveOperationsCount());
+ final Releasable operation1;
+ final Releasable operation2;
+ if (engineClosed == false) {
+ operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
+ assertEquals(1, indexShard.getActiveOperationsCount());
+ operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
+ assertEquals(2, indexShard.getActiveOperationsCount());
+ } else {
+ operation1 = null;
+ operation2 = null;
+ }
{
final AtomicBoolean onResponse = new AtomicBoolean();
@@ -579,7 +596,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
};
- indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, onLockAcquired,
+ ThreadPool.Names.INDEX);
assertFalse(onResponse.get());
assertTrue(onFailure.get());
@@ -593,6 +611,21 @@ public class IndexShardTests extends IndexShardTestCase {
final AtomicReference<Exception> onFailure = new AtomicReference<>();
final CyclicBarrier barrier = new CyclicBarrier(2);
final long newPrimaryTerm = primaryTerm + 1 + randomInt(20);
+ if (engineClosed == false) {
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
+ assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ }
+ final long newGlobalCheckPoint;
+ if (engineClosed || randomBoolean()) {
+ newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ } else {
+ long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100);
+ // advance local checkpoint
+ for (int i = 0; i <= localCheckPoint; i++) {
+ indexShard.markSeqNoAsNoop(i, indexShard.getPrimaryTerm(), "dummy doc");
+ }
+ newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
+ }
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
@@ -600,55 +633,72 @@ public class IndexShardTests extends IndexShardTestCase {
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
- indexShard.acquireReplicaOperationPermit(
- newPrimaryTerm,
- new ActionListener<Releasable>() {
- @Override
- public void onResponse(Releasable releasable) {
- assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
- onResponse.set(true);
- releasable.close();
- finish();
- }
+ ActionListener<Releasable> listener = new ActionListener<Releasable>() {
+ @Override
+ public void onResponse(Releasable releasable) {
+ assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+ assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
+ onResponse.set(true);
+ releasable.close();
+ finish();
+ }
- @Override
- public void onFailure(Exception e) {
- onFailure.set(e);
- finish();
- }
+ @Override
+ public void onFailure(Exception e) {
+ onFailure.set(e);
+ finish();
+ }
- private void finish() {
- try {
- barrier.await();
- } catch (final BrokenBarrierException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- },
+ private void finish() {
+ try {
+ barrier.await();
+ } catch (final BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ try {
+ indexShard.acquireReplicaOperationPermit(
+ newPrimaryTerm,
+ newGlobalCheckPoint,
+ listener,
ThreadPool.Names.SAME);
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
});
thread.start();
barrier.await();
- // our operation should be blocked until the previous operations complete
- assertFalse(onResponse.get());
- assertNull(onFailure.get());
- assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
- Releasables.close(operation1);
- // our operation should still be blocked
- assertFalse(onResponse.get());
- assertNull(onFailure.get());
- assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
- Releasables.close(operation2);
- barrier.await();
- // now lock acquisition should have succeeded
- assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
- if (engineClosed) {
+ if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) {
+ barrier.await();
+ assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
assertFalse(onResponse.get());
- assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
+ assertThat(onFailure.get(), instanceOf(IndexShardNotStartedException.class));
+ Releasables.close(operation1);
+ Releasables.close(operation2);
} else {
- assertTrue(onResponse.get());
+ // our operation should be blocked until the previous operations complete
+ assertFalse(onResponse.get());
assertNull(onFailure.get());
- assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+ assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
+ Releasables.close(operation1);
+ // our operation should still be blocked
+ assertFalse(onResponse.get());
+ assertNull(onFailure.get());
+ assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
+ Releasables.close(operation2);
+ barrier.await();
+ // now lock acquisition should have succeeded
+ assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+ if (engineClosed) {
+ assertFalse(onResponse.get());
+ assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
+ } else {
+ assertTrue(onResponse.get());
+ assertNull(onFailure.get());
+ assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+ assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
+ }
}
thread.join();
assertEquals(0, indexShard.getActiveOperationsCount());
@@ -676,6 +726,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + increment,
+ indexShard.getGlobalCheckpoint(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {