diff options
author | Jason Tedor <jason@tedor.me> | 2017-05-30 10:05:11 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-30 10:05:11 -0400 |
commit | 9957bdf0ad88e3625ef42c95a80d5edc3aac02f4 (patch) | |
tree | 5ee01cc71f02c969bfdec8c13f4a5abb4d91b29a /core/src/test/java/org/elasticsearch/action/support | |
parent | 491dc1186ae3a0f01a921d7761c51a8e6b040d50 (diff) |
Handle primary failure handling replica response
Today if the primary throws an exception while handling the replica
response (e.g., because it is already closed while updating the local
checkpoint for the replica), or because of a bug that causes an
exception to be thrown in the replica operation listener, this exception
is caught by the underlying transport handler plumbing and is translated
into a response handler failure transport exception that is passed to
the onFailure method of the replica operation listener. This causes the
primary to turn around and fail the replica which is a disastrous and
incorrect outcome as there's nothing wrong with the replica, it is the
primary that is broken and deserves a paddlin'. This commit handles this
situation by failing the primary.
Relates #24926
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
-rw-r--r-- | core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java | 53 |
1 files changed, 51 insertions, 2 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 9fcc8c2435..88cf5769a4 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; @@ -56,7 +57,9 @@ import java.util.function.Supplier; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -191,8 +194,7 @@ public class ReplicationOperationTests extends ESTestCase { assertTrue(primaryFailed.compareAndSet(false, true)); } }; - final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, - () -> finalState); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, () -> finalState); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -299,6 +301,53 @@ public class ReplicationOperationTests extends ESTestCase { } } + public void testPrimaryFailureHandlingReplicaResponse() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + final Request request = new Request(shardId); + + final ClusterState state = stateWithActivePrimary(index, true, 1, 0); + final IndexMetaData indexMetaData = state.getMetaData().index(index); + final long primaryTerm = indexMetaData.primaryTerm(0); + final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + + final boolean fatal = randomBoolean(); + final AtomicBoolean primaryFailed = new AtomicBoolean(); + final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary = new TestPrimary(primaryRouting, primaryTerm) { + + @Override + public void failShard(String message, Exception exception) { + primaryFailed.set(true); + } + + @Override + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + if (primaryRouting.allocationId().getId().equals(allocationId)) { + super.updateLocalCheckpointForShard(allocationId, checkpoint); + } else { + if (fatal) { + throw new NullPointerException(); + } else { + throw new AlreadyClosedException("already closed"); + } + } + } + + }; + + final PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>(); + final ReplicationOperation.Replicas<Request> replicas = new TestReplicaProxy(Collections.emptyMap()); + TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, () -> state); + operation.execute(); + + assertThat(primaryFailed.get(), equalTo(fatal)); + final ShardInfo shardInfo = listener.actionGet().getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(0)); + assertThat(shardInfo.getFailures(), arrayWithSize(0)); + assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size())); + } + private Set<ShardRouting> getExpectedReplicas(ShardId shardId, ClusterState state) { Set<ShardRouting> expectedReplicas = new HashSet<>(); String localNodeId = state.nodes().getLocalNodeId(); |