summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java')
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java149
1 files changed, 100 insertions, 49 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 5b068b2377..16baf57fd7 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -275,13 +275,22 @@ public class IndexShardTests extends IndexShardTestCase {
// expected
}
try {
- indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.UNASSIGNED_SEQ_NO, null,
+ ThreadPool.Names.INDEX);
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
}
+ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException {
+ IndexShard indexShard = newShard(false);
+ expectThrows(IndexShardNotStartedException.class, () ->
+ indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
+ SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX));
+ closeShards(indexShard);
+ }
+
public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
@@ -299,6 +308,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
indexShard.acquireReplicaOperationPermit(
indexShard.getPrimaryTerm(),
+ indexShard.getGlobalCheckpoint(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
@@ -477,7 +487,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) {
try {
- indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX);
fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) {
@@ -503,11 +513,11 @@ public class IndexShardTests extends IndexShardTestCase {
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
- indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX);
return fut.get();
}
- public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
+ public void testOperationPermitOnReplicaShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
final boolean engineClosed;
@@ -557,10 +567,17 @@ public class IndexShardTests extends IndexShardTestCase {
final long primaryTerm = indexShard.getPrimaryTerm();
final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration;
- final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
- assertEquals(1, indexShard.getActiveOperationsCount());
- final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
- assertEquals(2, indexShard.getActiveOperationsCount());
+ final Releasable operation1;
+ final Releasable operation2;
+ if (engineClosed == false) {
+ operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
+ assertEquals(1, indexShard.getActiveOperationsCount());
+ operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
+ assertEquals(2, indexShard.getActiveOperationsCount());
+ } else {
+ operation1 = null;
+ operation2 = null;
+ }
{
final AtomicBoolean onResponse = new AtomicBoolean();
@@ -579,7 +596,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
};
- indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX);
+ indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, onLockAcquired,
+ ThreadPool.Names.INDEX);
assertFalse(onResponse.get());
assertTrue(onFailure.get());
@@ -593,6 +611,21 @@ public class IndexShardTests extends IndexShardTestCase {
final AtomicReference<Exception> onFailure = new AtomicReference<>();
final CyclicBarrier barrier = new CyclicBarrier(2);
final long newPrimaryTerm = primaryTerm + 1 + randomInt(20);
+ if (engineClosed == false) {
+ assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
+ assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ }
+ final long newGlobalCheckPoint;
+ if (engineClosed || randomBoolean()) {
+ newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ } else {
+ long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100);
+ // advance local checkpoint
+ for (int i = 0; i <= localCheckPoint; i++) {
+ indexShard.markSeqNoAsNoop(i, indexShard.getPrimaryTerm(), "dummy doc");
+ }
+ newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
+ }
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
@@ -600,55 +633,72 @@ public class IndexShardTests extends IndexShardTestCase {
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
- indexShard.acquireReplicaOperationPermit(
- newPrimaryTerm,
- new ActionListener<Releasable>() {
- @Override
- public void onResponse(Releasable releasable) {
- assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
- onResponse.set(true);
- releasable.close();
- finish();
- }
+ ActionListener<Releasable> listener = new ActionListener<Releasable>() {
+ @Override
+ public void onResponse(Releasable releasable) {
+ assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+ assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
+ onResponse.set(true);
+ releasable.close();
+ finish();
+ }
- @Override
- public void onFailure(Exception e) {
- onFailure.set(e);
- finish();
- }
+ @Override
+ public void onFailure(Exception e) {
+ onFailure.set(e);
+ finish();
+ }
- private void finish() {
- try {
- barrier.await();
- } catch (final BrokenBarrierException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- },
+ private void finish() {
+ try {
+ barrier.await();
+ } catch (final BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ try {
+ indexShard.acquireReplicaOperationPermit(
+ newPrimaryTerm,
+ newGlobalCheckPoint,
+ listener,
ThreadPool.Names.SAME);
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
});
thread.start();
barrier.await();
- // our operation should be blocked until the previous operations complete
- assertFalse(onResponse.get());
- assertNull(onFailure.get());
- assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
- Releasables.close(operation1);
- // our operation should still be blocked
- assertFalse(onResponse.get());
- assertNull(onFailure.get());
- assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
- Releasables.close(operation2);
- barrier.await();
- // now lock acquisition should have succeeded
- assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
- if (engineClosed) {
+ if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) {
+ barrier.await();
+ assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
assertFalse(onResponse.get());
- assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
+ assertThat(onFailure.get(), instanceOf(IndexShardNotStartedException.class));
+ Releasables.close(operation1);
+ Releasables.close(operation2);
} else {
- assertTrue(onResponse.get());
+ // our operation should be blocked until the previous operations complete
+ assertFalse(onResponse.get());
assertNull(onFailure.get());
- assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+ assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
+ Releasables.close(operation1);
+ // our operation should still be blocked
+ assertFalse(onResponse.get());
+ assertNull(onFailure.get());
+ assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
+ Releasables.close(operation2);
+ barrier.await();
+ // now lock acquisition should have succeeded
+ assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+ if (engineClosed) {
+ assertFalse(onResponse.get());
+ assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
+ } else {
+ assertTrue(onResponse.get());
+ assertNull(onFailure.get());
+ assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+ assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
+ }
}
thread.join();
assertEquals(0, indexShard.getActiveOperationsCount());
@@ -676,6 +726,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + increment,
+ indexShard.getGlobalCheckpoint(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {