summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/support
diff options
context:
space:
mode:
authorLee Hinman <lee@writequit.org>2017-01-16 10:46:12 -0700
committerLee Hinman <lee@writequit.org>2017-02-03 14:39:46 -0700
commit39e7c3091244ac1d77d8c482ef870e95c428d946 (patch)
tree9bbc3f88c9b2c9beb6983389ca3acbbabf595e49 /core/src/test/java/org/elasticsearch/action/support
parentb0c9759441fbadc18047f3563d05248312bd35ae (diff)
Change certain replica failures not to fail the replica shard
This changes the way that replica failures are handled such that not all failures will cause the replica shard to be failed or marked as stale. In some cases such as refresh operations, or global checkpoint syncs, it is "okay" for the operation to fail without the shard being failed (because no data is out of sync). In these cases, instead of failing the shard we should simply fail the operation, and, in the event it is a user-facing operation, return a 5xx response code including the shard-specific failures. This was accomplished by having two forms of the `Replicas` proxy, one that is for non-write operations that does not fail the shard, and one that is for write operations that will fail the shard when an operation fails. Relates to #10708
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java22
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java199
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java254
3 files changed, 361 insertions, 114 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
index a701193d22..459bafd3af 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
@@ -190,11 +190,11 @@ public class ReplicationOperationTests extends ESTestCase {
final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) {
@Override
- public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
- Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
- Consumer<Exception> onIgnoredFailure) {
+ public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
+ Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
+ Consumer<Exception> onIgnoredFailure) {
if (testPrimaryDemotedOnStaleShardCopies) {
- super.failShard(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure);
+ super.failShardIfNeeded(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure);
} else {
assertThat(replica, equalTo(failedReplica));
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
@@ -202,12 +202,12 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
- public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
- Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
+ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
+ Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (testPrimaryDemotedOnStaleShardCopies) {
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
} else {
- super.markShardCopyAsStale(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure);
+ super.markShardCopyAsStaleIfNeeded(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure);
}
}
};
@@ -486,8 +486,8 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
- public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
- Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
+ public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
+ Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (failedReplicas.add(replica) == false) {
fail("replica [" + replica + "] was failed twice");
}
@@ -503,8 +503,8 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
- public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
- Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
+ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
+ Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (markedAsStaleCopies.add(allocationId) == false) {
fail("replica [" + allocationId + "] was marked as stale twice");
}
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
index c2c484fe3e..1bd9437047 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
@@ -135,7 +135,7 @@ public class TransportReplicationActionTests extends ESTestCase {
private ClusterService clusterService;
private TransportService transportService;
private CapturingTransport transport;
- private Action action;
+ private TestAction action;
private ShardStateAction shardStateAction;
/* *
@@ -159,7 +159,7 @@ public class TransportReplicationActionTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
- action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool);
+ action = new TestAction(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool);
}
@After
@@ -185,9 +185,10 @@ public class TransportReplicationActionTests extends ESTestCase {
public void testBlocks() throws ExecutionException, InterruptedException {
Request request = new Request();
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
- Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
+ TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks",
+ transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.WRITE;
@@ -197,7 +198,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
assertPhase(task, "failed");
@@ -226,7 +227,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterBlockException.class);
assertIndexShardUninitialized();
- action = new Action(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) {
+ action = new TestAction(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel globalBlockLevel() {
return null;
@@ -253,8 +254,8 @@ public class TransportReplicationActionTests extends ESTestCase {
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(shardId).timeout("1ms");
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
assertPhase(task, "failed");
@@ -301,8 +302,8 @@ public class TransportReplicationActionTests extends ESTestCase {
logger.debug("--> relocation ongoing state:\n{}", clusterService.state());
Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1);
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
reroutePhase.run();
assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);
assertTrue(request.isRetrySet.compareAndSet(true, false));
@@ -340,10 +341,10 @@ public class TransportReplicationActionTests extends ESTestCase {
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
assertPhase(task, "failed");
@@ -364,17 +365,18 @@ public class TransportReplicationActionTests extends ESTestCase {
ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index)));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms");
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null;
- Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
+ TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks", transportService,
+ clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel indexBlockLevel() {
return indexBlockLevel;
}
};
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
if (indexBlockLevel == ClusterBlockLevel.WRITE) {
assertListenerThrows("must throw block exception", listener, ClusterBlockException.class);
@@ -398,10 +400,10 @@ public class TransportReplicationActionTests extends ESTestCase {
} else {
request.timeout("1h");
}
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
@@ -452,9 +454,9 @@ public class TransportReplicationActionTests extends ESTestCase {
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
Request request = new Request(shardId);
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
- Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
+ TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertThat(request.shardId(), equalTo(shardId));
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
@@ -479,7 +481,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
setState(clusterService, state);
Request request = new Request(shardId).timeout("1ms");
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
AtomicBoolean executed = new AtomicBoolean();
@@ -492,11 +494,12 @@ public class TransportReplicationActionTests extends ESTestCase {
}
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
- protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
- createReplicatedOperation(Request request,
- ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
- TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
- boolean executeOnReplicas) {
+ protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
+ createReplicatedOperation(
+ Request request,
+ ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
+ TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
+ boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@@ -521,7 +524,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertThat(requests.size(), equalTo(1));
assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]"));
assertPhase(task, "primary_delegation");
- transport.handleResponse(requests.get(0).requestId, new Response());
+ transport.handleResponse(requests.get(0).requestId, new TestResponse());
assertTrue(listener.isDone());
listener.get();
assertPhase(task, "finished");
@@ -539,16 +542,17 @@ public class TransportReplicationActionTests extends ESTestCase {
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build();
setState(clusterService, state);
Request request = new Request(shardId).timeout("1ms");
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
AtomicBoolean executed = new AtomicBoolean();
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) {
@Override
- protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
- createReplicatedOperation(Request request,
- ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
- TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
- boolean executeOnReplicas) {
+ protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
+ createReplicatedOperation(
+ Request request,
+ ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
+ TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
+ boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@@ -579,7 +583,7 @@ public class TransportReplicationActionTests extends ESTestCase {
fail("releasable is closed twice");
}
};
- Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
+ TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
final Request request = new Request();
Request replicaRequest = (Request) primary.perform(request).replicaRequest;
@@ -596,7 +600,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
public void testReplicaProxy() throws InterruptedException, ExecutionException {
- Action.ReplicasProxy proxy = action.new ReplicasProxy();
+ ReplicationOperation.Replicas proxy = action.newReplicasProxy();
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2));
@@ -636,43 +640,15 @@ public class TransportReplicationActionTests extends ESTestCase {
assertListenerThrows("listener should reflect remote error", listener, TransportException.class);
}
- AtomicReference<Throwable> failure = new AtomicReference<>();
- AtomicReference<Throwable> ignoredFailure = new AtomicReference<>();
+ AtomicReference<Object> failure = new AtomicReference<>();
+ AtomicReference<Object> ignoredFailure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
- proxy.failShard(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"),
- () -> success.set(true), failure::set, ignoredFailure::set
+ proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"),
+ () -> success.set(true), failure::set, ignoredFailure::set
);
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
- assertEquals(1, shardFailedRequests.length);
- CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
- ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
- // the shard the request was sent to and the shard to be failed should be the same
- assertEquals(shardEntry.getShardId(), replica.shardId());
- assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());
- if (randomBoolean()) {
- // simulate success
- transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
- assertTrue(success.get());
- assertNull(failure.get());
- assertNull(ignoredFailure.get());
-
- } else if (randomBoolean()) {
- // simulate the primary has been demoted
- transport.handleRemoteError(shardFailedRequest.requestId,
- new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(),
- "shard-failed-test"));
- assertFalse(success.get());
- assertNotNull(failure.get());
- assertNull(ignoredFailure.get());
-
- } else {
- // simulated an "ignored" exception
- transport.handleRemoteError(shardFailedRequest.requestId,
- new NodeClosedException(state.nodes().getLocalNode()));
- assertFalse(success.get());
- assertNull(failure.get());
- assertNotNull(ignoredFailure.get());
- }
+ // A replication action doesn't not fail the request
+ assertEquals(0, shardFailedRequests.length);
}
public void testShadowIndexDisablesReplication() throws Exception {
@@ -691,9 +667,9 @@ public class TransportReplicationActionTests extends ESTestCase {
action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(),
createTransportChannel(new PlainActionFuture<>()), null) {
@Override
- protected ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> createReplicatedOperation(
- Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
- TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
+ protected ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> createReplicatedOperation(
+ Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
+ TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertFalse(executeOnReplicas);
assertFalse(executed.getAndSet(true));
@@ -715,7 +691,7 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request = new Request(shardId);
TransportReplicationAction.ConcreteShardRequest<Request> concreteShardRequest =
new TransportReplicationAction.ConcreteShardRequest<>(request, routingEntry.allocationId().getId());
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final IndexShard shard = mock(IndexShard.class);
@@ -730,10 +706,10 @@ public class TransportReplicationActionTests extends ESTestCase {
}
};
- Action action =
- new Action(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool);
+ TestAction action =
+ new TestAction(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool);
- TransportReplicationAction<Request, Request, Response>.PrimaryOperationTransportHandler primaryPhase =
+ TransportReplicationAction<Request, Request, TestResponse>.PrimaryOperationTransportHandler primaryPhase =
action.new PrimaryOperationTransportHandler();
primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
@@ -751,7 +727,7 @@ public class TransportReplicationActionTests extends ESTestCase {
logger.debug("--> using initial state:\n{}", clusterService.state());
final ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard();
Request request = new Request(shardId);
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
int i = randomInt(3);
final boolean throwExceptionOnCreation = i == 1;
@@ -759,11 +735,12 @@ public class TransportReplicationActionTests extends ESTestCase {
final boolean respondWithError = i == 3;
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
- protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
- createReplicatedOperation(Request request,
- ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
- TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
- boolean executeOnReplicas) {
+ protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
+ createReplicatedOperation(
+ Request request,
+ ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
+ TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
+ boolean executeOnReplicas) {
assertIndexShardCounter(1);
if (throwExceptionOnCreation) {
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
@@ -808,7 +785,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
boolean throwException = randomBoolean();
final ReplicationTask task = maybeTask();
- Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
+ TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
@@ -820,7 +797,7 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
- final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
+ final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
try {
replicaOperationTransportHandler.messageReceived(
new TransportReplicationAction.ConcreteShardRequest<>(
@@ -871,7 +848,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
setState(clusterService, state(index, true, ShardRoutingState.STARTED));
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
Request request = new Request(shardId).timeout("1ms");
action.new PrimaryOperationTransportHandler().messageReceived(
new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"),
@@ -897,7 +874,7 @@ public class TransportReplicationActionTests extends ESTestCase {
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(replica.currentNodeId())).build();
setState(clusterService, state);
- PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
Request request = new Request(shardId).timeout("1ms");
action.new ReplicaOperationTransportHandler().messageReceived(
new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"),
@@ -928,7 +905,7 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, state);
AtomicBoolean throwException = new AtomicBoolean(true);
final ReplicationTask task = maybeTask();
- Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
+ TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
@@ -939,8 +916,8 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
- final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
- final PlainActionFuture<Response> listener = new PlainActionFuture<>();
+ final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
+ final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
request.primaryTerm(state.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
replicaOperationTransportHandler.messageReceived(
@@ -1047,31 +1024,46 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
- static class Response extends ReplicationResponse {
+ static class TestResponse extends ReplicationResponse {
}
- class Action extends TransportReplicationAction<Request, Request, Response> {
+ private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {
- Action(Settings settings, String actionName, TransportService transportService,
- ClusterService clusterService,
- ShardStateAction shardStateAction,
- ThreadPool threadPool) {
+ private final boolean withDocumentFailureOnPrimary;
+ private final boolean withDocumentFailureOnReplica;
+
+ TestAction(Settings settings, String actionName, TransportService transportService,
+ ClusterService clusterService, ShardStateAction shardStateAction,
+ ThreadPool threadPool) {
+ super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
+ shardStateAction,
+ new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
+ Request::new, Request::new, ThreadPool.Names.SAME);
+ this.withDocumentFailureOnPrimary = false;
+ this.withDocumentFailureOnReplica = false;
+ }
+
+ TestAction(Settings settings, String actionName, TransportService transportService,
+ ClusterService clusterService, ShardStateAction shardStateAction,
+ ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
Request::new, Request::new, ThreadPool.Names.SAME);
+ this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
+ this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
@Override
- protected Response newResponseInstance() {
- return new Response();
+ protected TestResponse newResponseInstance() {
+ return new TestResponse();
}
@Override
protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception {
boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
assert executedBefore == false : "request has already been executed on the primary";
- return new PrimaryResult(shardRequest, new Response());
+ return new PrimaryResult(shardRequest, new TestResponse());
}
@Override
@@ -1156,22 +1148,23 @@ public class TransportReplicationActionTests extends ESTestCase {
return indexShard;
}
- class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> {
- NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult<Request, Response>> listener) {
+ class NoopReplicationOperation extends ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> {
+
+ NoopReplicationOperation(Request request, ActionListener<TestAction.PrimaryResult<Request, TestResponse>> listener) {
super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
}
@Override
public void execute() throws Exception {
// Using the diamond operator (<>) prevents Eclipse from being able to compile this code
- this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<Request, Response>(null, new Response()));
+ this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<Request, TestResponse>(null, new TestResponse()));
}
}
/**
* Transport channel that is needed for replica operation testing.
*/
- public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener) {
+ public TransportChannel createTransportChannel(final PlainActionFuture<TestResponse> listener) {
return new TransportChannel() {
@Override
@@ -1186,12 +1179,12 @@ public class TransportReplicationActionTests extends ESTestCase {
@Override
public void sendResponse(TransportResponse response) throws IOException {
- listener.onResponse(((Response) response));
+ listener.onResponse(((TestResponse) response));
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
- listener.onResponse(((Response) response));
+ listener.onResponse(((TestResponse) response));
}
@Override
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index 82cd013c19..781059fd85 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -19,38 +19,117 @@
package org.elasticsearch.action.support.replication;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
+import org.elasticsearch.action.support.replication.ReplicationOperation;
+import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse;
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.action.shard.ShardStateAction;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.node.NodeClosedException;
+import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.transport.CapturingTransport;
+import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.mockito.ArgumentCaptor;
import java.util.HashSet;
+import java.util.Locale;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
+import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TransportWriteActionTests extends ESTestCase {
+
+ private static ThreadPool threadPool;
+
+ private ClusterService clusterService;
private IndexShard indexShard;
private Translog.Location location;
+ @BeforeClass
+ public static void beforeClass() {
+ threadPool = new TestThreadPool("ShardReplicationTests");
+ }
+
@Before
public void initCommonMocks() {
indexShard = mock(IndexShard.class);
location = mock(Translog.Location.class);
+ clusterService = createClusterService(threadPool);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ clusterService.close();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
+ threadPool = null;
+ }
+
+ <T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
+ try {
+ listener.get();
+ fail(msg);
+ } catch (ExecutionException ex) {
+ assertThat(ex.getCause(), instanceOf(klass));
+ }
}
public void testPrimaryNoRefreshCall() throws Exception {
@@ -176,6 +255,95 @@ public class TransportWriteActionTests extends ESTestCase {
assertNotNull(listener.failure);
}
+ public void testReplicaProxy() throws InterruptedException, ExecutionException {
+ CapturingTransport transport = new CapturingTransport();
+ TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+ transportService.start();
+ transportService.acceptIncomingRequests();
+ ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
+ TestAction action = action = new TestAction(Settings.EMPTY, "testAction", transportService,
+ clusterService, shardStateAction, threadPool);
+ ReplicationOperation.Replicas proxy = action.newReplicasProxy();
+ final String index = "test";
+ final ShardId shardId = new ShardId(index, "_na_", 0);
+ ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2));
+ logger.info("using state: {}", state);
+ ClusterServiceUtils.setState(clusterService, state);
+
+ // check that at unknown node fails
+ PlainActionFuture<ReplicaResponse> listener = new PlainActionFuture<>();
+ proxy.performOn(
+ TestShardRouting.newShardRouting(shardId, "NOT THERE", false, randomFrom(ShardRoutingState.values())),
+ new TestRequest(), listener);
+ assertTrue(listener.isDone());
+ assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class);
+
+ final IndexShardRoutingTable shardRoutings = state.routingTable().shardRoutingTable(shardId);
+ final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
+ .filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
+ listener = new PlainActionFuture<>();
+ proxy.performOn(replica, new TestRequest(), listener);
+ assertFalse(listener.isDone());
+
+ CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
+ assertThat(captures, arrayWithSize(1));
+ if (randomBoolean()) {
+ final TransportReplicationAction.ReplicaResponse response =
+ new TransportReplicationAction.ReplicaResponse(randomAsciiOfLength(10), randomLong());
+ transport.handleResponse(captures[0].requestId, response);
+ assertTrue(listener.isDone());
+ assertThat(listener.get(), equalTo(response));
+ } else if (randomBoolean()) {
+ transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated"));
+ assertTrue(listener.isDone());
+ assertListenerThrows("listener should reflect remote error", listener, ElasticsearchException.class);
+ } else {
+ transport.handleError(captures[0].requestId, new TransportException("simulated"));
+ assertTrue(listener.isDone());
+ assertListenerThrows("listener should reflect remote error", listener, TransportException.class);
+ }
+
+ AtomicReference<Object> failure = new AtomicReference<>();
+ AtomicReference<Object> ignoredFailure = new AtomicReference<>();
+ AtomicBoolean success = new AtomicBoolean();
+ proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"),
+ () -> success.set(true), failure::set, ignoredFailure::set
+ );
+ CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
+ // A write replication action proxy should fail the shard
+ assertEquals(1, shardFailedRequests.length);
+ CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
+ ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
+ // the shard the request was sent to and the shard to be failed should be the same
+ assertEquals(shardEntry.getShardId(), replica.shardId());
+ assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());
+ if (randomBoolean()) {
+ // simulate success
+ transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
+ assertTrue(success.get());
+ assertNull(failure.get());
+ assertNull(ignoredFailure.get());
+
+ } else if (randomBoolean()) {
+ // simulate the primary has been demoted
+ transport.handleRemoteError(shardFailedRequest.requestId,
+ new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(),
+ "shard-failed-test"));
+ assertFalse(success.get());
+ assertNotNull(failure.get());
+ assertNull(ignoredFailure.get());
+
+ } else {
+ // simulated an "ignored" exception
+ transport.handleRemoteError(shardFailedRequest.requestId,
+ new NodeClosedException(state.nodes().getLocalNode()));
+ assertFalse(success.get());
+ assertNull(failure.get());
+ assertNotNull(ignoredFailure.get());
+ }
+ }
+
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
private final boolean withDocumentFailureOnPrimary;
@@ -184,6 +352,7 @@ public class TransportWriteActionTests extends ESTestCase {
protected TestAction() {
this(false, false);
}
+
protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(Settings.EMPTY, "test",
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null), null,
@@ -193,6 +362,17 @@ public class TransportWriteActionTests extends ESTestCase {
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
+ protected TestAction(Settings settings, String actionName, TransportService transportService,
+ ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
+ super(settings, actionName, transportService, clusterService,
+ mockIndicesService(clusterService), threadPool, shardStateAction,
+ new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
+ TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
+ this.withDocumentFailureOnPrimary = false;
+ this.withDocumentFailureOnReplica = false;
+ }
+
+
@Override
protected TestResponse newResponseInstance() {
return new TestResponse();
@@ -222,6 +402,80 @@ public class TransportWriteActionTests extends ESTestCase {
}
}
+ final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
+ final IndexService indexService = mock(IndexService.class);
+ when(indexService.getShard(anyInt())).then(invocation -> {
+ int shard = (Integer) invocation.getArguments()[0];
+ final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
+ if (shard > indexMetaData.getNumberOfShards()) {
+ throw new ShardNotFoundException(shardId);
+ }
+ return mockIndexShard(shardId, clusterService);
+ });
+ return indexService;
+ }
+
+ final IndicesService mockIndicesService(ClusterService clusterService) {
+ final IndicesService indicesService = mock(IndicesService.class);
+ when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
+ Index index = (Index)invocation.getArguments()[0];
+ final ClusterState state = clusterService.state();
+ final IndexMetaData indexSafe = state.metaData().getIndexSafe(index);
+ return mockIndexService(indexSafe, clusterService);
+ });
+ when(indicesService.indexService(any(Index.class))).then(invocation -> {
+ Index index = (Index) invocation.getArguments()[0];
+ final ClusterState state = clusterService.state();
+ if (state.metaData().hasIndex(index.getName())) {
+ final IndexMetaData indexSafe = state.metaData().getIndexSafe(index);
+ return mockIndexService(clusterService.state().metaData().getIndexSafe(index), clusterService);
+ } else {
+ return null;
+ }
+ });
+ return indicesService;
+ }
+
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ private final AtomicBoolean isRelocated = new AtomicBoolean(false);
+
+ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
+ final IndexShard indexShard = mock(IndexShard.class);
+ doAnswer(invocation -> {
+ ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
+ count.incrementAndGet();
+ callback.onResponse(count::decrementAndGet);
+ return null;
+ }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString());
+ doAnswer(invocation -> {
+ long term = (Long)invocation.getArguments()[0];
+ ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
+ final long primaryTerm = indexShard.getPrimaryTerm();
+ if (term < primaryTerm) {
+ throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
+ shardId, term, primaryTerm));
+ }
+ count.incrementAndGet();
+ callback.onResponse(count::decrementAndGet);
+ return null;
+ }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString());
+ when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
+ final ClusterState state = clusterService.state();
+ final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
+ final ShardRouting routing = node.getByShardId(shardId);
+ if (routing == null) {
+ throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node");
+ }
+ return routing;
+ });
+ when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED);
+ doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
+ when(indexShard.getPrimaryTerm()).thenAnswer(i ->
+ clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
+ return indexShard;
+ }
+
private static class TestRequest extends ReplicatedWriteRequest<TestRequest> {
TestRequest() {
setShardId(new ShardId("test", "test", 1));