diff options
author | Simon Willnauer <simon.willnauer@elasticsearch.com> | 2016-09-16 09:47:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-16 09:47:53 +0200 |
commit | f5daa165f12a9ef84006ba16d89c8baf1efe4b94 (patch) | |
tree | da2d949b515df12cb7abaf21efc84c1e31804661 /core/src/test/java/org/elasticsearch/action/ingest | |
parent | 577dcb32374cf2528a5deadf0a57f1ea1d5a9cbd (diff) |
Remove ability to plug-in TransportService (#20505)
TransportService is such a central part of the core server, replacing
it's implementation is risky and can cause serious issues. This change removes the ability to
plug in TransportService but allows registering a TransportInterceptor that enables
plugins to intercept requests on both the sender and the receiver ends. This is a commonly used
and overwritten functionality but encapsulates the custom code in a contained manner.
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/ingest')
-rw-r--r-- | core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java | 117 |
1 files changed, 74 insertions, 43 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java index a602465197..50bd3771bc 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java @@ -33,29 +33,30 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.hamcrest.CustomTypeSafeMatcher; -import org.mockito.stubbing.Answer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -67,7 +68,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { private TransportService transportService; @SuppressWarnings("unchecked") - private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes) { + private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes, TransportInterceptor interceptor) { ClusterState.Builder clusterState = new ClusterState.Builder(new ClusterName("_name")); DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(); DiscoveryNode localNode = null; @@ -88,7 +89,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); when(clusterService.localNode()).thenReturn(localNode); when(clusterService.state()).thenReturn(clusterState.build()); - transportService = mock(TransportService.class); + transportService = new TransportService(Settings.EMPTY, null, null, interceptor); return new IngestProxyActionFilter(clusterService, transportService); } @@ -97,7 +98,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(1, 5); - IngestProxyActionFilter filter = buildFilter(0, totalNodes); + IngestProxyActionFilter filter = buildFilter(0, totalNodes, TransportService.NOOP_TRANSPORT_INTERCEPTOR); String action; ActionRequest request; @@ -114,7 +115,6 @@ public class IngestProxyActionFilterTests extends ESTestCase { } catch(IllegalStateException e) { assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node.")); } - verifyZeroInteractions(transportService); verifyZeroInteractions(actionFilterChain); verifyZeroInteractions(actionListener); } @@ -124,7 +124,8 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(1, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes, + TransportService.NOOP_TRANSPORT_INTERCEPTOR); String action; ActionRequest request; @@ -136,7 +137,6 @@ public class IngestProxyActionFilterTests extends ESTestCase { request = new BulkRequest().add(new IndexRequest()); } filter.apply(task, action, request, actionListener, actionFilterChain); - verifyZeroInteractions(transportService); verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener)); verifyZeroInteractions(actionListener); } @@ -147,11 +147,11 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); ActionRequest request = mock(ActionRequest.class); int totalNodes = randomIntBetween(1, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes, + TransportService.NOOP_TRANSPORT_INTERCEPTOR); String action = randomAsciiOfLengthBetween(1, 20); filter.apply(task, action, request, actionListener, actionFilterChain); - verifyZeroInteractions(transportService); verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener)); verifyZeroInteractions(actionListener); } @@ -162,19 +162,31 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(2, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes); - Answer<Void> answer = invocationOnMock -> { - TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3]; - transportResponseHandler.handleResponse(new IndexResponse()); - return null; - }; - doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class)); + AtomicBoolean run = new AtomicBoolean(false); + + IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes, + new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler<T> handler) { + assertTrue(run.compareAndSet(false, true)); + assertTrue(node.isIngestNode()); + assertEquals(action, IndexAction.NAME); + handler.handleResponse((T) new IndexResponse()); + } + }; + } + }); IndexRequest indexRequest = new IndexRequest().setPipeline("_id"); filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(IndexAction.NAME), same(indexRequest), any(TransportResponseHandler.class)); verifyZeroInteractions(actionFilterChain); + assertTrue(run.get()); verify(actionListener).onResponse(any(IndexResponse.class)); verify(actionListener, never()).onFailure(any(TransportException.class)); } @@ -185,13 +197,24 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(2, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes); - Answer<Void> answer = invocationOnMock -> { - TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3]; - transportResponseHandler.handleResponse(new BulkResponse(null, -1)); - return null; - }; - doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class)); + AtomicBoolean run = new AtomicBoolean(false); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes, + new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler<T> handler) { + assertTrue(run.compareAndSet(false, true)); + assertTrue(node.isIngestNode()); + assertEquals(action, BulkAction.NAME); + handler.handleResponse((T) new BulkResponse(null, -1)); + } + }; + } + }); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(new IndexRequest().setPipeline("_id")); @@ -200,11 +223,10 @@ public class IngestProxyActionFilterTests extends ESTestCase { bulkRequest.add(new IndexRequest()); } filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain); - - verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(BulkAction.NAME), same(bulkRequest), any(TransportResponseHandler.class)); verifyZeroInteractions(actionFilterChain); verify(actionListener).onResponse(any(BulkResponse.class)); verify(actionListener, never()).onFailure(any(TransportException.class)); + assertTrue(run.get()); } @SuppressWarnings("unchecked") @@ -213,30 +235,39 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(2, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes); - Answer<Void> answer = invocationOnMock -> { - TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3]; - transportResponseHandler.handleException(new TransportException(new IllegalArgumentException())); - return null; - }; - doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class)); - - String action; + String requestAction; ActionRequest request; if (randomBoolean()) { - action = IndexAction.NAME; + requestAction = IndexAction.NAME; request = new IndexRequest().setPipeline("_id"); } else { - action = BulkAction.NAME; + requestAction = BulkAction.NAME; request = new BulkRequest().add(new IndexRequest().setPipeline("_id")); } - - filter.apply(task, action, request, actionListener, actionFilterChain); - - verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(action), same(request), any(TransportResponseHandler.class)); + AtomicBoolean run = new AtomicBoolean(false); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes, + new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler<T> handler) { + assertTrue(run.compareAndSet(false, true)); + assertTrue(node.isIngestNode()); + assertEquals(action, requestAction); + handler.handleException(new TransportException(new IllegalArgumentException())); + } + }; + } + }); + filter.apply(task, requestAction, request, actionListener, actionFilterChain); verifyZeroInteractions(actionFilterChain); verify(actionListener).onFailure(any(TransportException.class)); verify(actionListener, never()).onResponse(any(TransportResponse.class)); + assertTrue(run.get()); + } private static class IngestNodeMatcher extends CustomTypeSafeMatcher<DiscoveryNode> { |