summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/ingest
diff options
context:
space:
mode:
authorRyan Ernst <ryan@iernst.net>2016-12-07 08:43:26 -0800
committerGitHub <noreply@github.com>2016-12-07 08:43:26 -0800
commitf02a2b65460aae92c8bf81564be5449971867e97 (patch)
tree7dc51f6cfc69c0c192f2dc9423c4afaed8720bb0 /core/src/test/java/org/elasticsearch/action/ingest
parent8006b105f3727f9ddeff0f20f3bd7ac7b482ef6a (diff)
Ingest: Moved ingest invocation into index/bulk actions (#22015)
* Ingest: Moved ingest invocation into index/bulk actions Ingest was originally setup as a plugin, and in order to hook into the index and bulk actions, action filters were used. However, ingest was later moved into core, but the action filters were never removed. This change moves the execution of ingest into the index and bulk actions. * Address PR comments * Remove forwarder direct dependency on ClusterService
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/ingest')
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java167
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java235
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java271
3 files changed, 0 insertions, 673 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java
deleted file mode 100644
index 33dd755e7f..0000000000
--- a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package org.elasticsearch.action.ingest;
-
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.test.ESTestCase;
-import org.hamcrest.Matchers;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.mockito.Mockito.mock;
-
-public class BulkRequestModifierTests extends ESTestCase {
-
- public void testBulkRequestModifier() {
- int numRequests = scaledRandomIntBetween(8, 64);
- BulkRequest bulkRequest = new BulkRequest();
- for (int i = 0; i < numRequests; i++) {
- bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}"));
- }
- CaptureActionListener actionListener = new CaptureActionListener();
- IngestActionFilter.BulkRequestModifier bulkRequestModifier = new IngestActionFilter.BulkRequestModifier(bulkRequest);
-
- int i = 0;
- Set<Integer> failedSlots = new HashSet<>();
- while (bulkRequestModifier.hasNext()) {
- bulkRequestModifier.next();
- if (randomBoolean()) {
- bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException());
- failedSlots.add(i);
- }
- i++;
- }
-
- assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
- // simulate that we actually executed the modified bulk request:
- long ingestTook = randomLong();
- ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTook, actionListener);
- result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
-
- BulkResponse bulkResponse = actionListener.getResponse();
- assertThat(bulkResponse.getIngestTookInMillis(), equalTo(ingestTook));
- for (int j = 0; j < bulkResponse.getItems().length; j++) {
- if (failedSlots.contains(j)) {
- BulkItemResponse item = bulkResponse.getItems()[j];
- assertThat(item.isFailed(), is(true));
- assertThat(item.getFailure().getIndex(), equalTo("_index"));
- assertThat(item.getFailure().getType(), equalTo("_type"));
- assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
- assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
- } else {
- assertThat(bulkResponse.getItems()[j], nullValue());
- }
- }
- }
-
- public void testPipelineFailures() {
- BulkRequest originalBulkRequest = new BulkRequest();
- for (int i = 0; i < 32; i++) {
- originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
- }
-
- IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
- for (int i = 0; modifier.hasNext(); i++) {
- modifier.next();
- if (i % 2 == 0) {
- modifier.markCurrentItemAsFailed(new RuntimeException());
- }
- }
-
- // So half of the requests have "failed", so only the successful requests are left:
- BulkRequest bulkRequest = modifier.getBulkRequest();
- assertThat(bulkRequest.requests().size(), Matchers.equalTo(16));
-
- List<BulkItemResponse> responses = new ArrayList<>();
- ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<BulkResponse>() {
- @Override
- public void onResponse(BulkResponse bulkItemResponses) {
- responses.addAll(Arrays.asList(bulkItemResponses.getItems()));
- }
-
- @Override
- public void onFailure(Exception e) {
- }
- });
-
- List<BulkItemResponse> originalResponses = new ArrayList<>();
- for (DocWriteRequest actionRequest : bulkRequest.requests()) {
- IndexRequest indexRequest = (IndexRequest) actionRequest;
- IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(), indexRequest.id(), 1, 1, true);
- originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType(), indexResponse));
- }
- bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0));
-
- assertThat(responses.size(), Matchers.equalTo(32));
- for (int i = 0; i < 32; i++) {
- assertThat(responses.get(i).getId(), Matchers.equalTo(String.valueOf(i)));
- }
- }
-
- public void testNoFailures() {
- BulkRequest originalBulkRequest = new BulkRequest();
- for (int i = 0; i < 32; i++) {
- originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
- }
-
- IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
- while (modifier.hasNext()) {
- modifier.next();
- }
-
- BulkRequest bulkRequest = modifier.getBulkRequest();
- assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
- @SuppressWarnings("unchecked")
- ActionListener<BulkResponse> actionListener = mock(ActionListener.class);
- assertThat(modifier.wrapActionListenerIfNeeded(1L, actionListener).getClass().isAnonymousClass(), is(true));
- }
-
- private static class CaptureActionListener implements ActionListener<BulkResponse> {
-
- private BulkResponse response;
-
- @Override
- public void onResponse(BulkResponse bulkItemResponses) {
- this.response = bulkItemResponses;
- }
-
- @Override
- public void onFailure(Exception e) {
- }
-
- public BulkResponse getResponse() {
- return response;
- }
- }
-}
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java
deleted file mode 100644
index 68a79c345e..0000000000
--- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.ingest;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BulkAction;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexAction;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.ActionFilterChain;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.ingest.CompoundProcessor;
-import org.elasticsearch.ingest.IngestService;
-import org.elasticsearch.ingest.Pipeline;
-import org.elasticsearch.ingest.PipelineExecutionService;
-import org.elasticsearch.ingest.PipelineStore;
-import org.elasticsearch.ingest.Processor;
-import org.elasticsearch.ingest.TestProcessor;
-import org.elasticsearch.node.service.NodeService;
-import org.elasticsearch.tasks.Task;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.Before;
-import org.mockito.stubbing.Answer;
-
-import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class IngestActionFilterTests extends ESTestCase {
-
- private IngestActionFilter filter;
- private PipelineExecutionService executionService;
-
- @Before
- public void setup() {
- executionService = mock(PipelineExecutionService.class);
- IngestService ingestService = mock(IngestService.class);
- when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
- NodeService nodeService = mock(NodeService.class);
- when(nodeService.getIngestService()).thenReturn(ingestService);
- filter = new IngestActionFilter(Settings.EMPTY, nodeService);
- }
-
- public void testApplyNoPipelineId() throws Exception {
- IndexRequest indexRequest = new IndexRequest();
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
-
- verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener);
- verifyZeroInteractions(executionService, actionFilterChain);
- }
-
- public void testApplyBulkNoPipelineId() throws Exception {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.add(new IndexRequest());
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
-
- verify(actionFilterChain).proceed(task, BulkAction.NAME, bulkRequest, actionListener);
- verifyZeroInteractions(executionService, actionFilterChain);
- }
-
- @SuppressWarnings("unchecked")
- public void testApplyIngestIdViaRequestParam() throws Exception {
- Task task = mock(Task.class);
- IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
- indexRequest.source("field", "value");
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
-
- verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
- verifyZeroInteractions(actionFilterChain);
- }
-
- @SuppressWarnings("unchecked")
- public void testApplyExecuted() throws Exception {
- Task task = mock(Task.class);
- IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
- indexRequest.source("field", "value");
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- Answer answer = invocationOnMock -> {
- @SuppressWarnings("unchecked")
- Consumer<Boolean> listener = (Consumer) invocationOnMock.getArguments()[2];
- listener.accept(true);
- return null;
- };
- doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
-
- verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
- verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener);
- verifyZeroInteractions(actionListener);
- }
-
- @SuppressWarnings("unchecked")
- public void testApplyFailed() throws Exception {
- Task task = mock(Task.class);
- IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
- indexRequest.source("field", "value");
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- RuntimeException exception = new RuntimeException();
- Answer answer = invocationOnMock -> {
- Consumer<Throwable> handler = (Consumer) invocationOnMock.getArguments()[1];
- handler.accept(exception);
- return null;
- };
- doAnswer(answer).when(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
-
- verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
- verify(actionListener).onFailure(exception);
- verifyZeroInteractions(actionFilterChain);
- }
-
- public void testApplyWithBulkRequest() throws Exception {
- Task task = mock(Task.class);
- ThreadPool threadPool = mock(ThreadPool.class);
- final ExecutorService executorService = EsExecutors.newDirectExecutorService();
- when(threadPool.executor(any())).thenReturn(executorService);
- PipelineStore store = mock(PipelineStore.class);
-
- Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2"));
- when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", randomInt(), new CompoundProcessor(processor)));
- executionService = new PipelineExecutionService(store, threadPool);
- IngestService ingestService = mock(IngestService.class);
- when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
- NodeService nodeService = mock(NodeService.class);
- when(nodeService.getIngestService()).thenReturn(ingestService);
- filter = new IngestActionFilter(Settings.EMPTY, nodeService);
-
- BulkRequest bulkRequest = new BulkRequest();
- int numRequest = scaledRandomIntBetween(8, 64);
- for (int i = 0; i < numRequest; i++) {
- if (rarely()) {
- DocWriteRequest request;
- if (randomBoolean()) {
- request = new DeleteRequest("_index", "_type", "_id");
- } else {
- request = new UpdateRequest("_index", "_type", "_id");
- }
- bulkRequest.add(request);
- } else {
- IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
- indexRequest.source("field1", "value1");
- bulkRequest.add(indexRequest);
- }
- }
-
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
- verify(actionFilterChain).proceed(eq(task), eq(BulkAction.NAME), eq(bulkRequest), any());
- verifyZeroInteractions(actionListener);
-
- int assertedRequests = 0;
- for (DocWriteRequest actionRequest : bulkRequest.requests()) {
- if (actionRequest instanceof IndexRequest) {
- IndexRequest indexRequest = (IndexRequest) actionRequest;
- assertThat(indexRequest.sourceAsMap().size(), equalTo(2));
- assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1"));
- assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2"));
- }
- assertedRequests++;
- }
- assertThat(assertedRequests, equalTo(numRequest));
- }
-
- @SuppressWarnings("unchecked")
- public void testIndexApiSinglePipelineExecution() {
- Answer answer = invocationOnMock -> {
- @SuppressWarnings("unchecked")
- Consumer<Boolean> listener = (Consumer) invocationOnMock.getArguments()[2];
- listener.accept(true);
- return null;
- };
- doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
-
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
-
- IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id").source("field", "value");
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
- assertThat(indexRequest.getPipeline(), nullValue());
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
- verify(executionService, times(1)).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
- verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener);
- }
-}
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
deleted file mode 100644
index 85240c4a8e..0000000000
--- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.ingest;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkAction;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexAction;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.support.ActionFilterChain;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.tasks.Task;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.test.VersionUtils;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportInterceptor;
-import org.elasticsearch.transport.TransportRequest;
-import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseHandler;
-import org.elasticsearch.transport.TransportService;
-import org.hamcrest.CustomTypeSafeMatcher;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class IngestProxyActionFilterTests extends ESTestCase {
-
- private TransportService transportService;
-
- @SuppressWarnings("unchecked")
- private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes, TransportInterceptor interceptor) {
- ClusterState.Builder clusterState = new ClusterState.Builder(new ClusterName("_name"));
- DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder();
- DiscoveryNode localNode = null;
- for (int i = 0; i < totalNodes; i++) {
- String nodeId = "node" + i;
- Map<String, String> attributes = new HashMap<>();
- Set<DiscoveryNode.Role> roles = new HashSet<>();
- if (i < ingestNodes) {
- roles.add(DiscoveryNode.Role.INGEST);
- }
- DiscoveryNode node = new DiscoveryNode(nodeId, nodeId, buildNewFakeTransportAddress(), attributes, roles, VersionUtils.randomVersion(random()));
- builder.add(node);
- if (i == totalNodes - 1) {
- localNode = node;
- }
- }
- clusterState.nodes(builder);
- ClusterService clusterService = mock(ClusterService.class);
- when(clusterService.localNode()).thenReturn(localNode);
- when(clusterService.state()).thenReturn(clusterState.build());
- transportService = new TransportService(Settings.EMPTY, null, null, interceptor, null);
- return new IngestProxyActionFilter(clusterService, transportService);
- }
-
- public void testApplyNoIngestNodes() {
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
- int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(0, totalNodes, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
-
- String action;
- ActionRequest request;
- if (randomBoolean()) {
- action = IndexAction.NAME;
- request = new IndexRequest().setPipeline("_id");
- } else {
- action = BulkAction.NAME;
- request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
- }
- try {
- filter.apply(task, action, request, actionListener, actionFilterChain);
- fail("should have failed because there are no ingest nodes");
- } catch(IllegalStateException e) {
- assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node."));
- }
- verifyZeroInteractions(actionFilterChain);
- verifyZeroInteractions(actionListener);
- }
-
- public void testApplyNoPipelineId() {
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
- int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
- TransportService.NOOP_TRANSPORT_INTERCEPTOR);
-
- String action;
- ActionRequest request;
- if (randomBoolean()) {
- action = IndexAction.NAME;
- request = new IndexRequest();
- } else {
- action = BulkAction.NAME;
- request = new BulkRequest().add(new IndexRequest());
- }
- filter.apply(task, action, request, actionListener, actionFilterChain);
- verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
- verifyZeroInteractions(actionListener);
- }
-
- public void testApplyAnyAction() {
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
- ActionRequest request = mock(ActionRequest.class);
- int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
- TransportService.NOOP_TRANSPORT_INTERCEPTOR);
-
- String action = randomAsciiOfLengthBetween(1, 20);
- filter.apply(task, action, request, actionListener, actionFilterChain);
- verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
- verifyZeroInteractions(actionListener);
- }
-
- @SuppressWarnings("unchecked")
- public void testApplyIndexRedirect() {
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
- int totalNodes = randomIntBetween(2, 5);
- AtomicBoolean run = new AtomicBoolean(false);
-
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
- new TransportInterceptor() {
- @Override
- public AsyncSender interceptSender(AsyncSender sender) {
- return new AsyncSender() {
- @Override
- public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
- TransportRequestOptions options,
- TransportResponseHandler<T> handler) {
- assertTrue(run.compareAndSet(false, true));
- assertTrue(node.isIngestNode());
- assertEquals(action, IndexAction.NAME);
- handler.handleResponse((T) new IndexResponse());
- }
- };
- }
- });
-
- IndexRequest indexRequest = new IndexRequest().setPipeline("_id");
- filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
-
- verifyZeroInteractions(actionFilterChain);
- assertTrue(run.get());
- verify(actionListener).onResponse(any(IndexResponse.class));
- verify(actionListener, never()).onFailure(any(TransportException.class));
- }
-
- @SuppressWarnings("unchecked")
- public void testApplyBulkRedirect() {
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
- int totalNodes = randomIntBetween(2, 5);
- AtomicBoolean run = new AtomicBoolean(false);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
- new TransportInterceptor() {
- @Override
- public AsyncSender interceptSender(AsyncSender sender) {
- return new AsyncSender() {
- @Override
- public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
- TransportRequestOptions options,
- TransportResponseHandler<T> handler) {
- assertTrue(run.compareAndSet(false, true));
- assertTrue(node.isIngestNode());
- assertEquals(action, BulkAction.NAME);
- handler.handleResponse((T) new BulkResponse(null, -1));
- }
- };
- }
- });
-
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.add(new IndexRequest().setPipeline("_id"));
- int numNoPipelineRequests = randomIntBetween(0, 10);
- for (int i = 0; i < numNoPipelineRequests; i++) {
- bulkRequest.add(new IndexRequest());
- }
- filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
- verifyZeroInteractions(actionFilterChain);
- verify(actionListener).onResponse(any(BulkResponse.class));
- verify(actionListener, never()).onFailure(any(TransportException.class));
- assertTrue(run.get());
- }
-
- @SuppressWarnings("unchecked")
- public void testApplyFailures() {
- Task task = mock(Task.class);
- ActionListener actionListener = mock(ActionListener.class);
- ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
- int totalNodes = randomIntBetween(2, 5);
- String requestAction;
- ActionRequest request;
- if (randomBoolean()) {
- requestAction = IndexAction.NAME;
- request = new IndexRequest().setPipeline("_id");
- } else {
- requestAction = BulkAction.NAME;
- request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
- }
- AtomicBoolean run = new AtomicBoolean(false);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
- new TransportInterceptor() {
- @Override
- public AsyncSender interceptSender(AsyncSender sender) {
- return new AsyncSender() {
- @Override
- public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
- TransportRequestOptions options,
- TransportResponseHandler<T> handler) {
- assertTrue(run.compareAndSet(false, true));
- assertTrue(node.isIngestNode());
- assertEquals(action, requestAction);
- handler.handleException(new TransportException(new IllegalArgumentException()));
- }
- };
- }
- });
- filter.apply(task, requestAction, request, actionListener, actionFilterChain);
- verifyZeroInteractions(actionFilterChain);
- verify(actionListener).onFailure(any(TransportException.class));
- verify(actionListener, never()).onResponse(any(TransportResponse.class));
- assertTrue(run.get());
-
- }
-}