diff options
author | Lee Hinman <lee@writequit.org> | 2017-01-13 10:09:18 -0700 |
---|---|---|
committer | Lee Hinman <lee@writequit.org> | 2017-01-13 10:09:18 -0700 |
commit | cd236c4de4a9edb8a5fe2a3d159eecd67815f92b (patch) | |
tree | 1023d178f6fe4466ec989a1398ac252f1640ecec /core/src/test/java/org/elasticsearch/action/support | |
parent | b4c8c21553fe3348e9c447e0deb49dbd81864c9e (diff) | |
parent | e93fdb846083a29dc65505e111e32553d6f7be6c (diff) |
Merge remote-tracking branch 'zareek/enhancement/use_shard_bulk_for_single_ops'
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
3 files changed, 94 insertions, 66 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java index fbeac88dbe..2e1a00afc2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java @@ -52,7 +52,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase { fail("can't index, does not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]")); // but really, all is well } @@ -81,7 +81,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase { fail("can't index, not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]")); // but really, all is well } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 471315bde2..b929681032 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -492,9 +492,11 @@ public class TransportReplicationActionTests extends ESTestCase { } action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request, - ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>> + createReplicatedOperation(Request request, + ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener, + TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -542,9 +544,11 @@ public class TransportReplicationActionTests extends ESTestCase { AtomicBoolean executed = new AtomicBoolean(); action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request, - ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>> + createReplicatedOperation(Request request, + ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener, + TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -577,7 +581,7 @@ public class TransportReplicationActionTests extends ESTestCase { }; Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(); - Request replicaRequest = primary.perform(request).replicaRequest; + Request replicaRequest = (Request) primary.perform(request).replicaRequest; assertThat(replicaRequest.primaryTerm(), equalTo(primaryTerm)); @@ -687,13 +691,15 @@ public class TransportReplicationActionTests extends ESTestCase { action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(), createTransportChannel(new PlainActionFuture<>()), null) { @Override - protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request, - ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference, + protected ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> createReplicatedOperation( + Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener, + TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { assertFalse(executeOnReplicas); assertFalse(executed.getAndSet(true)); return new NoopReplicationOperation(request, actionListener); } + }.run(); assertThat(executed.get(), equalTo(true)); } @@ -753,9 +759,11 @@ public class TransportReplicationActionTests extends ESTestCase { final boolean respondWithError = i == 3; action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request, - ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>> + createReplicatedOperation(Request request, + ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener, + TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { assertIndexShardCounter(1); if (throwExceptionOnCreation) { throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); @@ -1148,14 +1156,14 @@ public class TransportReplicationActionTests extends ESTestCase { return indexShard; } - class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult> { - public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult> listener) { + class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> { + public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult<Request, Response>> listener) { super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override public void execute() throws Exception { - this.resultListener.onResponse(action.new PrimaryResult(null, new Response())); + this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<>(null, new Response())); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index c069de8919..730528965a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -36,7 +36,6 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.HashSet; -import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.mockito.Matchers.any; @@ -55,21 +54,27 @@ public class TransportWriteActionTests extends ESTestCase { } public void testPrimaryNoRefreshCall() throws Exception { - noRefreshCall(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond); + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit + TestAction testAction = new TestAction(); + TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener<TestResponse> listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + verify(indexShard, never()).refresh(any()); + verify(indexShard, never()).addRefreshListener(any(), any()); } public void testReplicaNoRefreshCall() throws Exception { - noRefreshCall(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond); - } - - private <Result, Response> void noRefreshCall(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action, - BiConsumer<Result, CapturingActionListener<Response>> responder) - throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit - Result result = action.apply(new TestAction(), request, indexShard); - CapturingActionListener<Response> listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(); + TransportWriteAction.WriteReplicaResult<TestRequest> result = + testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>(); + result.respond(listener); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -77,47 +82,66 @@ public class TransportWriteActionTests extends ESTestCase { } public void testPrimaryImmediateRefresh() throws Exception { - immediateRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond, r -> assertTrue(r.forcedRefresh)); + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); + TestAction testAction = new TestAction(); + TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener<TestResponse> listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + assertTrue(listener.response.forcedRefresh); + verify(indexShard).refresh("refresh_flag_index"); + verify(indexShard, never()).addRefreshListener(any(), any()); } public void testReplicaImmediateRefresh() throws Exception { - immediateRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, r -> {}); - } - - private <Result, Response> void immediateRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action, - BiConsumer<Result, CapturingActionListener<Response>> responder, - Consumer<Response> responseChecker) throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); - Result result = action.apply(new TestAction(), request, indexShard); - CapturingActionListener<Response> listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(); + TransportWriteAction.WriteReplicaResult<TestRequest> result = + testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>(); + result.respond(listener); assertNotNull(listener.response); assertNull(listener.failure); - responseChecker.accept(listener.response); verify(indexShard).refresh("refresh_flag_index"); verify(indexShard, never()).addRefreshListener(any(), any()); } public void testPrimaryWaitForRefresh() throws Exception { - waitForRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond, - (r, forcedRefresh) -> assertEquals(forcedRefresh, r.forcedRefresh)); - } + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - public void testReplicaWaitForRefresh() throws Exception { - waitForRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, (r, forcedRefresh) -> {}); + TestAction testAction = new TestAction(); + TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener<TestResponse> listener = new CapturingActionListener<>(); + result.respond(listener); + assertNull(listener.response); // Haven't reallresponded yet + + @SuppressWarnings({ "unchecked", "rawtypes" }) + ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); + verify(indexShard, never()).refresh(any()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + + // Now we can fire the listener manually and we'll get a response + boolean forcedRefresh = randomBoolean(); + refreshListener.getValue().accept(forcedRefresh); + assertNotNull(listener.response); + assertNull(listener.failure); + assertEquals(forcedRefresh, listener.response.forcedRefresh); } - private <Result, Response> void waitForRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action, - BiConsumer<Result, CapturingActionListener<Response>> responder, - BiConsumer<Response, Boolean> resultChecker) throws Exception { + public void testReplicaWaitForRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - Result result = action.apply(new TestAction(), request, indexShard); - CapturingActionListener<Response> listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(); + TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>(); + result.respond(listener); assertNull(listener.response); // Haven't responded yet - @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); verify(indexShard, never()).refresh(any()); @@ -128,13 +152,12 @@ public class TransportWriteActionTests extends ESTestCase { refreshListener.getValue().accept(forcedRefresh); assertNotNull(listener.response); assertNull(listener.failure); - resultChecker.accept(listener.response, forcedRefresh); } public void testDocumentFailureInShardOperationOnPrimary() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(true, true); - TransportWriteAction<TestRequest, TestRequest, TestResponse>.WritePrimaryResult writePrimaryResult = + TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> writePrimaryResult = testAction.shardOperationOnPrimary(request, indexShard); CapturingActionListener<TestResponse> listener = new CapturingActionListener<>(); writePrimaryResult.respond(listener); @@ -145,7 +168,7 @@ public class TransportWriteActionTests extends ESTestCase { public void testDocumentFailureInShardOperationOnReplica() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(randomBoolean(), true); - TransportWriteAction<TestRequest, TestRequest, TestResponse>.WriteReplicaResult writeReplicaResult = + TransportWriteAction.WriteReplicaResult<TestRequest> writeReplicaResult = testAction.shardOperationOnReplica(request, indexShard); CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>(); writeReplicaResult.respond(listener); @@ -176,23 +199,24 @@ public class TransportWriteActionTests extends ESTestCase { } @Override - protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception { - final WritePrimaryResult primaryResult; + protected WritePrimaryResult<TestRequest, TestResponse> shardOperationOnPrimary( + TestRequest request, IndexShard primary) throws Exception { + final WritePrimaryResult<TestRequest, TestResponse> primaryResult; if (withDocumentFailureOnPrimary) { - primaryResult = new WritePrimaryResult(request, null, null, new RuntimeException("simulated"), primary); + primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger); } else { - primaryResult = new WritePrimaryResult(request, new TestResponse(), location, null, primary); + primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger); } return primaryResult; } @Override - protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { - final WriteReplicaResult replicaResult; + protected WriteReplicaResult<TestRequest> shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { + final WriteReplicaResult<TestRequest> replicaResult; if (withDocumentFailureOnReplica) { - replicaResult = new WriteReplicaResult(request, null, new RuntimeException("simulated"), replica); + replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); } else { - replicaResult = new WriteReplicaResult(request, location, null, replica); + replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); } return replicaResult; } @@ -232,8 +256,4 @@ public class TransportWriteActionTests extends ESTestCase { this.failure = failure; } } - - private interface ThrowingTriFunction<A, B, C, R> { - R apply(A a, B b, C c) throws Exception; - } } |