summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/ingest
diff options
context:
space:
mode:
authorSimon Willnauer <simon.willnauer@elasticsearch.com>2016-09-16 09:47:53 +0200
committerGitHub <noreply@github.com>2016-09-16 09:47:53 +0200
commitf5daa165f12a9ef84006ba16d89c8baf1efe4b94 (patch)
treeda2d949b515df12cb7abaf21efc84c1e31804661 /core/src/test/java/org/elasticsearch/action/ingest
parent577dcb32374cf2528a5deadf0a57f1ea1d5a9cbd (diff)
Remove ability to plug-in TransportService (#20505)
TransportService is such a central part of the core server, replacing it's implementation is risky and can cause serious issues. This change removes the ability to plug in TransportService but allows registering a TransportInterceptor that enables plugins to intercept requests on both the sender and the receiver ends. This is a commonly used and overwritten functionality but encapsulates the custom code in a contained manner.
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/ingest')
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java117
1 files changed, 74 insertions, 43 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
index a602465197..50bd3771bc 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
@@ -33,29 +33,30 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.transport.TransportException;
+import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.CustomTypeSafeMatcher;
-import org.mockito.stubbing.Answer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -67,7 +68,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
private TransportService transportService;
@SuppressWarnings("unchecked")
- private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes) {
+ private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes, TransportInterceptor interceptor) {
ClusterState.Builder clusterState = new ClusterState.Builder(new ClusterName("_name"));
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder();
DiscoveryNode localNode = null;
@@ -88,7 +89,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(localNode);
when(clusterService.state()).thenReturn(clusterState.build());
- transportService = mock(TransportService.class);
+ transportService = new TransportService(Settings.EMPTY, null, null, interceptor);
return new IngestProxyActionFilter(clusterService, transportService);
}
@@ -97,7 +98,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(0, totalNodes);
+ IngestProxyActionFilter filter = buildFilter(0, totalNodes, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action;
ActionRequest request;
@@ -114,7 +115,6 @@ public class IngestProxyActionFilterTests extends ESTestCase {
} catch(IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node."));
}
- verifyZeroInteractions(transportService);
verifyZeroInteractions(actionFilterChain);
verifyZeroInteractions(actionListener);
}
@@ -124,7 +124,8 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action;
ActionRequest request;
@@ -136,7 +137,6 @@ public class IngestProxyActionFilterTests extends ESTestCase {
request = new BulkRequest().add(new IndexRequest());
}
filter.apply(task, action, request, actionListener, actionFilterChain);
- verifyZeroInteractions(transportService);
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
verifyZeroInteractions(actionListener);
}
@@ -147,11 +147,11 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
ActionRequest request = mock(ActionRequest.class);
int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action = randomAsciiOfLengthBetween(1, 20);
filter.apply(task, action, request, actionListener, actionFilterChain);
- verifyZeroInteractions(transportService);
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
verifyZeroInteractions(actionListener);
}
@@ -162,19 +162,31 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
- Answer<Void> answer = invocationOnMock -> {
- TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
- transportResponseHandler.handleResponse(new IndexResponse());
- return null;
- };
- doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
+ AtomicBoolean run = new AtomicBoolean(false);
+
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
+ new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
+ TransportRequestOptions options,
+ TransportResponseHandler<T> handler) {
+ assertTrue(run.compareAndSet(false, true));
+ assertTrue(node.isIngestNode());
+ assertEquals(action, IndexAction.NAME);
+ handler.handleResponse((T) new IndexResponse());
+ }
+ };
+ }
+ });
IndexRequest indexRequest = new IndexRequest().setPipeline("_id");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
- verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(IndexAction.NAME), same(indexRequest), any(TransportResponseHandler.class));
verifyZeroInteractions(actionFilterChain);
+ assertTrue(run.get());
verify(actionListener).onResponse(any(IndexResponse.class));
verify(actionListener, never()).onFailure(any(TransportException.class));
}
@@ -185,13 +197,24 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
- Answer<Void> answer = invocationOnMock -> {
- TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
- transportResponseHandler.handleResponse(new BulkResponse(null, -1));
- return null;
- };
- doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
+ AtomicBoolean run = new AtomicBoolean(false);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
+ new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
+ TransportRequestOptions options,
+ TransportResponseHandler<T> handler) {
+ assertTrue(run.compareAndSet(false, true));
+ assertTrue(node.isIngestNode());
+ assertEquals(action, BulkAction.NAME);
+ handler.handleResponse((T) new BulkResponse(null, -1));
+ }
+ };
+ }
+ });
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest().setPipeline("_id"));
@@ -200,11 +223,10 @@ public class IngestProxyActionFilterTests extends ESTestCase {
bulkRequest.add(new IndexRequest());
}
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
-
- verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(BulkAction.NAME), same(bulkRequest), any(TransportResponseHandler.class));
verifyZeroInteractions(actionFilterChain);
verify(actionListener).onResponse(any(BulkResponse.class));
verify(actionListener, never()).onFailure(any(TransportException.class));
+ assertTrue(run.get());
}
@SuppressWarnings("unchecked")
@@ -213,30 +235,39 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
- Answer<Void> answer = invocationOnMock -> {
- TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
- transportResponseHandler.handleException(new TransportException(new IllegalArgumentException()));
- return null;
- };
- doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
-
- String action;
+ String requestAction;
ActionRequest request;
if (randomBoolean()) {
- action = IndexAction.NAME;
+ requestAction = IndexAction.NAME;
request = new IndexRequest().setPipeline("_id");
} else {
- action = BulkAction.NAME;
+ requestAction = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
}
-
- filter.apply(task, action, request, actionListener, actionFilterChain);
-
- verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(action), same(request), any(TransportResponseHandler.class));
+ AtomicBoolean run = new AtomicBoolean(false);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
+ new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
+ TransportRequestOptions options,
+ TransportResponseHandler<T> handler) {
+ assertTrue(run.compareAndSet(false, true));
+ assertTrue(node.isIngestNode());
+ assertEquals(action, requestAction);
+ handler.handleException(new TransportException(new IllegalArgumentException()));
+ }
+ };
+ }
+ });
+ filter.apply(task, requestAction, request, actionListener, actionFilterChain);
verifyZeroInteractions(actionFilterChain);
verify(actionListener).onFailure(any(TransportException.class));
verify(actionListener, never()).onResponse(any(TransportResponse.class));
+ assertTrue(run.get());
+
}
private static class IngestNodeMatcher extends CustomTypeSafeMatcher<DiscoveryNode> {