summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/support
diff options
context:
space:
mode:
authorLee Hinman <lee@writequit.org>2017-05-17 16:17:33 -0600
committerLee Hinman <lee@writequit.org>2017-05-24 13:31:30 -0600
commit7a6db074ee95f1df7d4f678ea44332e9f655f4c8 (patch)
treebabdb22c22fa520dd825051be2fcf2833e631683 /core/src/test/java/org/elasticsearch/action/support
parenta64937db7aba2d4c6568052b07c532141a09db8d (diff)
[TEST] Add test for retrying replica operations with real network
Related to #24745
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java67
1 files changed, 67 insertions, 0 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
index 89026d9d1d..f91fab381d 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
@@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -49,10 +50,14 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
@@ -63,12 +68,16 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
+import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.MockTcpTransport;
+import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@@ -82,10 +91,13 @@ import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -918,6 +930,61 @@ public class TransportReplicationActionTests extends ESTestCase {
assertConcreteShardRequest(capturedRequest.request, request, replica.allocationId());
}
+ public void testRetryOnReplicaWithRealTransport() throws Exception {
+ final ShardId shardId = new ShardId("test", "_na_", 0);
+ final ClusterState initialState = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
+ final ShardRouting replica = initialState.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
+ // simulate execution of the node holding the replica
+ final ClusterState stateWithNodes = ClusterState.builder(initialState)
+ .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(replica.currentNodeId())).build();
+ setState(clusterService, stateWithNodes);
+ AtomicBoolean throwException = new AtomicBoolean(true);
+ final ReplicationTask task = maybeTask();
+ NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
+ final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
+ new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
+ Version.CURRENT);
+ transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+ x -> clusterService.localNode(),null);
+ transportService.start();
+ transportService.acceptIncomingRequests();
+
+ AtomicBoolean calledSuccessfully = new AtomicBoolean(false);
+ TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
+ threadPool) {
+ @Override
+ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
+ assertPhase(task, "replica");
+ if (throwException.get()) {
+ throw new RetryOnReplicaException(shardId, "simulation");
+ }
+ calledSuccessfully.set(true);
+ return new ReplicaResult();
+ }
+ };
+ final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
+ final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
+ final Request request = new Request().setShardId(shardId);
+ final long checkpoint = randomNonNegativeLong();
+ request.primaryTerm(stateWithNodes.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
+ replicaOperationTransportHandler.messageReceived(
+ new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), checkpoint),
+ createTransportChannel(listener), task);
+ if (listener.isDone()) {
+ listener.get(); // fail with the exception if there
+ fail("listener shouldn't be done");
+ }
+
+ // release the waiting
+ throwException.set(false);
+ // publish a new state (same as the old state with the version incremented)
+ setState(clusterService, stateWithNodes);
+
+ // Assert that the request was retried, this time successfull
+ assertTrue("action should have been successfully called on retry but was not", calledSuccessfully.get());
+ transportService.stop();
+ }
+
private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) {
final TransportReplicationAction.ConcreteShardRequest<?> concreteShardRequest =
(TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest;