summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/support
diff options
context:
space:
mode:
authorLee Hinman <lee@writequit.org>2017-01-13 10:09:18 -0700
committerLee Hinman <lee@writequit.org>2017-01-13 10:09:18 -0700
commitcd236c4de4a9edb8a5fe2a3d159eecd67815f92b (patch)
tree1023d178f6fe4466ec989a1398ac252f1640ecec /core/src/test/java/org/elasticsearch/action/support
parentb4c8c21553fe3348e9c447e0deb49dbd81864c9e (diff)
parente93fdb846083a29dc65505e111e32553d6f7be6c (diff)
Merge remote-tracking branch 'zareek/enhancement/use_shard_bulk_for_single_ops'
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java38
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java118
3 files changed, 94 insertions, 66 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java
index fbeac88dbe..2e1a00afc2 100644
--- a/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java
+++ b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java
@@ -52,7 +52,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
fail("can't index, does not enough active shard copies");
} catch (UnavailableShardsException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
- assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
+ assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]"));
// but really, all is well
}
@@ -81,7 +81,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
fail("can't index, not enough active shard copies");
} catch (UnavailableShardsException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
- assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
+ assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]"));
// but really, all is well
}
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
index 471315bde2..b929681032 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
@@ -492,9 +492,11 @@ public class TransportReplicationActionTests extends ESTestCase {
}
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
- protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
- ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
- boolean executeOnReplicas) {
+ protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
+ createReplicatedOperation(Request request,
+ ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
+ TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
+ boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@@ -542,9 +544,11 @@ public class TransportReplicationActionTests extends ESTestCase {
AtomicBoolean executed = new AtomicBoolean();
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) {
@Override
- protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
- ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
- boolean executeOnReplicas) {
+ protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
+ createReplicatedOperation(Request request,
+ ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
+ TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
+ boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@@ -577,7 +581,7 @@ public class TransportReplicationActionTests extends ESTestCase {
};
Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
final Request request = new Request();
- Request replicaRequest = primary.perform(request).replicaRequest;
+ Request replicaRequest = (Request) primary.perform(request).replicaRequest;
assertThat(replicaRequest.primaryTerm(), equalTo(primaryTerm));
@@ -687,13 +691,15 @@ public class TransportReplicationActionTests extends ESTestCase {
action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(),
createTransportChannel(new PlainActionFuture<>()), null) {
@Override
- protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
- ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
+ protected ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> createReplicatedOperation(
+ Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
+ TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertFalse(executeOnReplicas);
assertFalse(executed.getAndSet(true));
return new NoopReplicationOperation(request, actionListener);
}
+
}.run();
assertThat(executed.get(), equalTo(true));
}
@@ -753,9 +759,11 @@ public class TransportReplicationActionTests extends ESTestCase {
final boolean respondWithError = i == 3;
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
- protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
- ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
- boolean executeOnReplicas) {
+ protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
+ createReplicatedOperation(Request request,
+ ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
+ TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
+ boolean executeOnReplicas) {
assertIndexShardCounter(1);
if (throwExceptionOnCreation) {
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
@@ -1148,14 +1156,14 @@ public class TransportReplicationActionTests extends ESTestCase {
return indexShard;
}
- class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult> {
- public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult> listener) {
+ class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> {
+ public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult<Request, Response>> listener) {
super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
}
@Override
public void execute() throws Exception {
- this.resultListener.onResponse(action.new PrimaryResult(null, new Response()));
+ this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<>(null, new Response()));
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index c069de8919..730528965a 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -36,7 +36,6 @@ import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.HashSet;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.mockito.Matchers.any;
@@ -55,21 +54,27 @@ public class TransportWriteActionTests extends ESTestCase {
}
public void testPrimaryNoRefreshCall() throws Exception {
- noRefreshCall(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond);
+ TestRequest request = new TestRequest();
+ request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
+ TestAction testAction = new TestAction();
+ TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result =
+ testAction.shardOperationOnPrimary(request, indexShard);
+ CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
+ result.respond(listener);
+ assertNotNull(listener.response);
+ assertNull(listener.failure);
+ verify(indexShard, never()).refresh(any());
+ verify(indexShard, never()).addRefreshListener(any(), any());
}
public void testReplicaNoRefreshCall() throws Exception {
- noRefreshCall(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond);
- }
-
- private <Result, Response> void noRefreshCall(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
- BiConsumer<Result, CapturingActionListener<Response>> responder)
- throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
- Result result = action.apply(new TestAction(), request, indexShard);
- CapturingActionListener<Response> listener = new CapturingActionListener<>();
- responder.accept(result, listener);
+ TestAction testAction = new TestAction();
+ TransportWriteAction.WriteReplicaResult<TestRequest> result =
+ testAction.shardOperationOnReplica(request, indexShard);
+ CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
+ result.respond(listener);
assertNotNull(listener.response);
assertNull(listener.failure);
verify(indexShard, never()).refresh(any());
@@ -77,47 +82,66 @@ public class TransportWriteActionTests extends ESTestCase {
}
public void testPrimaryImmediateRefresh() throws Exception {
- immediateRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond, r -> assertTrue(r.forcedRefresh));
+ TestRequest request = new TestRequest();
+ request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
+ TestAction testAction = new TestAction();
+ TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result =
+ testAction.shardOperationOnPrimary(request, indexShard);
+ CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
+ result.respond(listener);
+ assertNotNull(listener.response);
+ assertNull(listener.failure);
+ assertTrue(listener.response.forcedRefresh);
+ verify(indexShard).refresh("refresh_flag_index");
+ verify(indexShard, never()).addRefreshListener(any(), any());
}
public void testReplicaImmediateRefresh() throws Exception {
- immediateRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, r -> {});
- }
-
- private <Result, Response> void immediateRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
- BiConsumer<Result, CapturingActionListener<Response>> responder,
- Consumer<Response> responseChecker) throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
- Result result = action.apply(new TestAction(), request, indexShard);
- CapturingActionListener<Response> listener = new CapturingActionListener<>();
- responder.accept(result, listener);
+ TestAction testAction = new TestAction();
+ TransportWriteAction.WriteReplicaResult<TestRequest> result =
+ testAction.shardOperationOnReplica(request, indexShard);
+ CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
+ result.respond(listener);
assertNotNull(listener.response);
assertNull(listener.failure);
- responseChecker.accept(listener.response);
verify(indexShard).refresh("refresh_flag_index");
verify(indexShard, never()).addRefreshListener(any(), any());
}
public void testPrimaryWaitForRefresh() throws Exception {
- waitForRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond,
- (r, forcedRefresh) -> assertEquals(forcedRefresh, r.forcedRefresh));
- }
+ TestRequest request = new TestRequest();
+ request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
- public void testReplicaWaitForRefresh() throws Exception {
- waitForRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, (r, forcedRefresh) -> {});
+ TestAction testAction = new TestAction();
+ TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result =
+ testAction.shardOperationOnPrimary(request, indexShard);
+ CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
+ result.respond(listener);
+ assertNull(listener.response); // Haven't reallresponded yet
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
+ verify(indexShard, never()).refresh(any());
+ verify(indexShard).addRefreshListener(any(), refreshListener.capture());
+
+ // Now we can fire the listener manually and we'll get a response
+ boolean forcedRefresh = randomBoolean();
+ refreshListener.getValue().accept(forcedRefresh);
+ assertNotNull(listener.response);
+ assertNull(listener.failure);
+ assertEquals(forcedRefresh, listener.response.forcedRefresh);
}
- private <Result, Response> void waitForRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
- BiConsumer<Result, CapturingActionListener<Response>> responder,
- BiConsumer<Response, Boolean> resultChecker) throws Exception {
+ public void testReplicaWaitForRefresh() throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
- Result result = action.apply(new TestAction(), request, indexShard);
- CapturingActionListener<Response> listener = new CapturingActionListener<>();
- responder.accept(result, listener);
+ TestAction testAction = new TestAction();
+ TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard);
+ CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
+ result.respond(listener);
assertNull(listener.response); // Haven't responded yet
-
@SuppressWarnings({ "unchecked", "rawtypes" })
ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
verify(indexShard, never()).refresh(any());
@@ -128,13 +152,12 @@ public class TransportWriteActionTests extends ESTestCase {
refreshListener.getValue().accept(forcedRefresh);
assertNotNull(listener.response);
assertNull(listener.failure);
- resultChecker.accept(listener.response, forcedRefresh);
}
public void testDocumentFailureInShardOperationOnPrimary() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(true, true);
- TransportWriteAction<TestRequest, TestRequest, TestResponse>.WritePrimaryResult writePrimaryResult =
+ TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> writePrimaryResult =
testAction.shardOperationOnPrimary(request, indexShard);
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
writePrimaryResult.respond(listener);
@@ -145,7 +168,7 @@ public class TransportWriteActionTests extends ESTestCase {
public void testDocumentFailureInShardOperationOnReplica() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(randomBoolean(), true);
- TransportWriteAction<TestRequest, TestRequest, TestResponse>.WriteReplicaResult writeReplicaResult =
+ TransportWriteAction.WriteReplicaResult<TestRequest> writeReplicaResult =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
writeReplicaResult.respond(listener);
@@ -176,23 +199,24 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
- protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception {
- final WritePrimaryResult primaryResult;
+ protected WritePrimaryResult<TestRequest, TestResponse> shardOperationOnPrimary(
+ TestRequest request, IndexShard primary) throws Exception {
+ final WritePrimaryResult<TestRequest, TestResponse> primaryResult;
if (withDocumentFailureOnPrimary) {
- primaryResult = new WritePrimaryResult(request, null, null, new RuntimeException("simulated"), primary);
+ primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger);
} else {
- primaryResult = new WritePrimaryResult(request, new TestResponse(), location, null, primary);
+ primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger);
}
return primaryResult;
}
@Override
- protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
- final WriteReplicaResult replicaResult;
+ protected WriteReplicaResult<TestRequest> shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
+ final WriteReplicaResult<TestRequest> replicaResult;
if (withDocumentFailureOnReplica) {
- replicaResult = new WriteReplicaResult(request, null, new RuntimeException("simulated"), replica);
+ replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger);
} else {
- replicaResult = new WriteReplicaResult(request, location, null, replica);
+ replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger);
}
return replicaResult;
}
@@ -232,8 +256,4 @@ public class TransportWriteActionTests extends ESTestCase {
this.failure = failure;
}
}
-
- private interface ThrowingTriFunction<A, B, C, R> {
- R apply(A a, B b, C c) throws Exception;
- }
}