summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/support
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-05-30 10:05:11 -0400
committerGitHub <noreply@github.com>2017-05-30 10:05:11 -0400
commit9957bdf0ad88e3625ef42c95a80d5edc3aac02f4 (patch)
tree5ee01cc71f02c969bfdec8c13f4a5abb4d91b29a /core/src/test/java/org/elasticsearch/action/support
parent491dc1186ae3a0f01a921d7761c51a8e6b040d50 (diff)
Handle primary failure handling replica response
Today if the primary throws an exception while handling the replica response (e.g., because it is already closed while updating the local checkpoint for the replica), or because of a bug that causes an exception to be thrown in the replica operation listener, this exception is caught by the underlying transport handler plumbing and is translated into a response handler failure transport exception that is passed to the onFailure method of the replica operation listener. This causes the primary to turn around and fail the replica which is a disastrous and incorrect outcome as there's nothing wrong with the replica, it is the primary that is broken and deserves a paddlin'. This commit handles this situation by failing the primary. Relates #24926
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java53
1 files changed, 51 insertions, 2 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
index 9fcc8c2435..88cf5769a4 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
@@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
@@ -56,7 +57,9 @@ import java.util.function.Supplier;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@@ -191,8 +194,7 @@ public class ReplicationOperationTests extends ESTestCase {
assertTrue(primaryFailed.compareAndSet(false, true));
}
};
- final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy,
- () -> finalState);
+ final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, () -> finalState);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@@ -299,6 +301,53 @@ public class ReplicationOperationTests extends ESTestCase {
}
}
+ public void testPrimaryFailureHandlingReplicaResponse() throws Exception {
+ final String index = "test";
+ final ShardId shardId = new ShardId(index, "_na_", 0);
+
+ final Request request = new Request(shardId);
+
+ final ClusterState state = stateWithActivePrimary(index, true, 1, 0);
+ final IndexMetaData indexMetaData = state.getMetaData().index(index);
+ final long primaryTerm = indexMetaData.primaryTerm(0);
+ final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
+
+ final boolean fatal = randomBoolean();
+ final AtomicBoolean primaryFailed = new AtomicBoolean();
+ final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary = new TestPrimary(primaryRouting, primaryTerm) {
+
+ @Override
+ public void failShard(String message, Exception exception) {
+ primaryFailed.set(true);
+ }
+
+ @Override
+ public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
+ if (primaryRouting.allocationId().getId().equals(allocationId)) {
+ super.updateLocalCheckpointForShard(allocationId, checkpoint);
+ } else {
+ if (fatal) {
+ throw new NullPointerException();
+ } else {
+ throw new AlreadyClosedException("already closed");
+ }
+ }
+ }
+
+ };
+
+ final PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
+ final ReplicationOperation.Replicas<Request> replicas = new TestReplicaProxy(Collections.emptyMap());
+ TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, () -> state);
+ operation.execute();
+
+ assertThat(primaryFailed.get(), equalTo(fatal));
+ final ShardInfo shardInfo = listener.actionGet().getShardInfo();
+ assertThat(shardInfo.getFailed(), equalTo(0));
+ assertThat(shardInfo.getFailures(), arrayWithSize(0));
+ assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size()));
+ }
+
private Set<ShardRouting> getExpectedReplicas(ShardId shardId, ClusterState state) {
Set<ShardRouting> expectedReplicas = new HashSet<>();
String localNodeId = state.nodes().getLocalNodeId();