diff options
author | Jason Tedor <jason@tedor.me> | 2016-09-28 23:32:06 +0200 |
---|---|---|
committer | Jason Tedor <jason@tedor.me> | 2016-09-29 00:22:31 +0200 |
commit | 25fd9e26c4fb042d6f6dea14efcd239577bdbbc4 (patch) | |
tree | 0fb107a1ef29edb32226f421779d424818493abc /core/src/test/java/org/elasticsearch/action/ingest | |
parent | c809671eb3a4ee113e337bc5f63ce5283017a17a (diff) | |
parent | 85402d5220d86ecebf6f2332a9743e717016ce24 (diff) |
Merge branch 'master' into feature/seq_no
* master: (1199 commits)
[DOCS] Remove non-valid link to mapping migration document
Revert "Default `include_in_all` for numeric-like types to false"
test: add a test with ipv6 address
docs: clearify that both ip4 and ip6 addresses are supported
Include complex settings in settings requests
Add production warning for pre-release builds
Clean up confusing error message on unhandled endpoint
[TEST] Increase logging level in testDelayShards()
change health from string to enum (#20661)
Provide error message when plugin id is missing
Document that sliced scroll works for reindex
Make reindex-from-remote ignore unknown fields
Remove NoopGatewayAllocator in favor of a more realistic mock (#20637)
Remove Marvel character reference from guide
Fix documentation for setting Java I/O temp dir
Update client benchmarks to log4j2
Changes the API of GatewayAllocator#applyStartedShards and (#20642)
Removes FailedRerouteAllocation and StartedRerouteAllocation
IndexRoutingTable.initializeEmpty shouldn't override supplied primary RecoverySource (#20638)
Smoke tester: Adjust to latest changes (#20611)
...
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/ingest')
10 files changed, 205 insertions, 92 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index b04533fafc..1316c87e2a 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -162,7 +162,7 @@ public class IngestActionFilterTests extends ESTestCase { PipelineStore store = mock(PipelineStore.class); Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2")); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", randomInt(), new CompoundProcessor(processor))); executionService = new PipelineExecutionService(store, threadPool); IngestService ingestService = mock(IngestService.class); when(ingestService.getPipelineExecutionService()).thenReturn(executionService); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java index 3d1a1a1c69..50bd3771bc 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java @@ -33,29 +33,30 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.hamcrest.CustomTypeSafeMatcher; -import org.mockito.stubbing.Answer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -67,7 +68,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { private TransportService transportService; @SuppressWarnings("unchecked") - private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes) { + private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes, TransportInterceptor interceptor) { ClusterState.Builder clusterState = new ClusterState.Builder(new ClusterName("_name")); DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(); DiscoveryNode localNode = null; @@ -79,7 +80,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { roles.add(DiscoveryNode.Role.INGEST); } DiscoveryNode node = new DiscoveryNode(nodeId, nodeId, LocalTransportAddress.buildUnique(), attributes, roles, VersionUtils.randomVersion(random())); - builder.put(node); + builder.add(node); if (i == totalNodes - 1) { localNode = node; } @@ -88,7 +89,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); when(clusterService.localNode()).thenReturn(localNode); when(clusterService.state()).thenReturn(clusterState.build()); - transportService = mock(TransportService.class); + transportService = new TransportService(Settings.EMPTY, null, null, interceptor); return new IngestProxyActionFilter(clusterService, transportService); } @@ -97,7 +98,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(1, 5); - IngestProxyActionFilter filter = buildFilter(0, totalNodes); + IngestProxyActionFilter filter = buildFilter(0, totalNodes, TransportService.NOOP_TRANSPORT_INTERCEPTOR); String action; ActionRequest request; @@ -114,7 +115,6 @@ public class IngestProxyActionFilterTests extends ESTestCase { } catch(IllegalStateException e) { assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node.")); } - verifyZeroInteractions(transportService); verifyZeroInteractions(actionFilterChain); verifyZeroInteractions(actionListener); } @@ -124,7 +124,8 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(1, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes, + TransportService.NOOP_TRANSPORT_INTERCEPTOR); String action; ActionRequest request; @@ -136,7 +137,6 @@ public class IngestProxyActionFilterTests extends ESTestCase { request = new BulkRequest().add(new IndexRequest()); } filter.apply(task, action, request, actionListener, actionFilterChain); - verifyZeroInteractions(transportService); verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener)); verifyZeroInteractions(actionListener); } @@ -147,11 +147,11 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); ActionRequest request = mock(ActionRequest.class); int totalNodes = randomIntBetween(1, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes, + TransportService.NOOP_TRANSPORT_INTERCEPTOR); String action = randomAsciiOfLengthBetween(1, 20); filter.apply(task, action, request, actionListener, actionFilterChain); - verifyZeroInteractions(transportService); verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener)); verifyZeroInteractions(actionListener); } @@ -162,19 +162,31 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(2, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes); - Answer<Void> answer = invocationOnMock -> { - TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3]; - transportResponseHandler.handleResponse(new IndexResponse()); - return null; - }; - doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class)); + AtomicBoolean run = new AtomicBoolean(false); + + IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes, + new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler<T> handler) { + assertTrue(run.compareAndSet(false, true)); + assertTrue(node.isIngestNode()); + assertEquals(action, IndexAction.NAME); + handler.handleResponse((T) new IndexResponse()); + } + }; + } + }); IndexRequest indexRequest = new IndexRequest().setPipeline("_id"); filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(IndexAction.NAME), same(indexRequest), any(TransportResponseHandler.class)); verifyZeroInteractions(actionFilterChain); + assertTrue(run.get()); verify(actionListener).onResponse(any(IndexResponse.class)); verify(actionListener, never()).onFailure(any(TransportException.class)); } @@ -185,13 +197,24 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(2, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes); - Answer<Void> answer = invocationOnMock -> { - TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3]; - transportResponseHandler.handleResponse(new BulkResponse(null, -1)); - return null; - }; - doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class)); + AtomicBoolean run = new AtomicBoolean(false); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes, + new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler<T> handler) { + assertTrue(run.compareAndSet(false, true)); + assertTrue(node.isIngestNode()); + assertEquals(action, BulkAction.NAME); + handler.handleResponse((T) new BulkResponse(null, -1)); + } + }; + } + }); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(new IndexRequest().setPipeline("_id")); @@ -200,11 +223,10 @@ public class IngestProxyActionFilterTests extends ESTestCase { bulkRequest.add(new IndexRequest()); } filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain); - - verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(BulkAction.NAME), same(bulkRequest), any(TransportResponseHandler.class)); verifyZeroInteractions(actionFilterChain); verify(actionListener).onResponse(any(BulkResponse.class)); verify(actionListener, never()).onFailure(any(TransportException.class)); + assertTrue(run.get()); } @SuppressWarnings("unchecked") @@ -213,30 +235,39 @@ public class IngestProxyActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); int totalNodes = randomIntBetween(2, 5); - IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes); - Answer<Void> answer = invocationOnMock -> { - TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3]; - transportResponseHandler.handleException(new TransportException(new IllegalArgumentException())); - return null; - }; - doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class)); - - String action; + String requestAction; ActionRequest request; if (randomBoolean()) { - action = IndexAction.NAME; + requestAction = IndexAction.NAME; request = new IndexRequest().setPipeline("_id"); } else { - action = BulkAction.NAME; + requestAction = BulkAction.NAME; request = new BulkRequest().add(new IndexRequest().setPipeline("_id")); } - - filter.apply(task, action, request, actionListener, actionFilterChain); - - verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(action), same(request), any(TransportResponseHandler.class)); + AtomicBoolean run = new AtomicBoolean(false); + IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes, + new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler<T> handler) { + assertTrue(run.compareAndSet(false, true)); + assertTrue(node.isIngestNode()); + assertEquals(action, requestAction); + handler.handleException(new TransportException(new IllegalArgumentException())); + } + }; + } + }); + filter.apply(task, requestAction, request, actionListener, actionFilterChain); verifyZeroInteractions(actionFilterChain); verify(actionListener).onFailure(any(TransportException.class)); verify(actionListener, never()).onResponse(any(TransportResponse.class)); + assertTrue(run.get()); + } private static class IngestNodeMatcher extends CustomTypeSafeMatcher<DiscoveryNode> { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java index 544e2932b4..83aad26f6a 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument; +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 5806d8c312..a4320d2641 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -34,7 +34,7 @@ import org.junit.Before; import java.util.Collections; import java.util.Map; -import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument; +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -43,6 +43,8 @@ import static org.hamcrest.Matchers.sameInstance; public class SimulateExecutionServiceTests extends ESTestCase { + private final Integer version = randomBoolean() ? randomInt() : null; + private ThreadPool threadPool; private SimulateExecutionService executionService; private IngestDocument ingestDocument; @@ -65,7 +67,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteVerboseItem() throws Exception { TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -90,7 +92,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteItem() throws Exception { TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); @@ -103,7 +105,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {}); TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); @@ -127,7 +129,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {}); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); - Pipeline pipeline = new Pipeline("_id", "_description", + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2)), processor3)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); @@ -145,7 +147,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument))); IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument); - Map<String, String> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata(); + Map<String, Object> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata(); metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock"); metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0"); metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed"); @@ -160,9 +162,26 @@ public class SimulateExecutionServiceTests extends ESTestCase { } public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception { - TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); }); + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; }); + CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); + SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); + SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; + assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0")); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument))); + assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata()))); + } + + public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception { + TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -177,7 +196,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteItemWithFailure() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); }); - Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor)); + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index fa4bdc6525..ab5d30c6f9 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -19,19 +19,6 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.ingest.ProcessorsRegistry; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.TestTemplateService; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -40,6 +27,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields; import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID; import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; @@ -58,13 +54,12 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { public void init() throws IOException { TestProcessor processor = new TestProcessor(ingestDocument -> {}); CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor); - Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor); - ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder(); - processorRegistryBuilder.registerProcessor("mock_processor", ((registry) -> mock(Processor.Factory.class))); - ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(mock(ScriptService.class), mock(ClusterService.class)); + Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor); + Map<String, Processor.Factory> registry = + Collections.singletonMap("mock_processor", (factories, tag, config) -> processor); store = mock(PipelineStore.class); when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline); - when(store.getProcessorRegistry()).thenReturn(processorRegistry); + when(store.getProcessorFactories()).thenReturn(registry); } public void testParseUsingPipelineStore() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java new file mode 100644 index 0000000000..0966010a8f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class SimulatePipelineRequestTests extends ESTestCase { + + public void testSerialization() throws IOException { + SimulatePipelineRequest request = new SimulatePipelineRequest(new BytesArray("")); + // Sometimes we set an id + if (randomBoolean()) { + request.setId(randomAsciiOfLengthBetween(1, 10)); + } + + // Sometimes we explicitly set a boolean (with whatever value) + if (randomBoolean()) { + request.setVerbose(randomBoolean()); + } + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(); + otherRequest.readFrom(streamInput); + + assertThat(otherRequest.getId(), equalTo(request.getId())); + assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose())); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java index 576e8e0172..ad308b01bf 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java @@ -30,7 +30,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument; +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.nullValue; @@ -39,6 +39,7 @@ public class SimulatePipelineResponseTests extends ESTestCase { public void testSerialization() throws IOException { boolean isVerbose = randomBoolean(); + String id = randomBoolean() ? randomAsciiOfLengthBetween(1, 10) : null; int numResults = randomIntBetween(1, 10); List<SimulateDocumentResult> results = new ArrayList<>(numResults); for (int i = 0; i < numResults; i++) { @@ -70,7 +71,7 @@ public class SimulatePipelineResponseTests extends ESTestCase { } } - SimulatePipelineResponse response = new SimulatePipelineResponse(randomAsciiOfLengthBetween(1, 10), isVerbose, results); + SimulatePipelineResponse response = new SimulatePipelineResponse(id, isVerbose, results); BytesStreamOutput out = new BytesStreamOutput(); response.writeTo(out); StreamInput streamInput = out.bytes().streamInput(); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java index ccf3a67494..75d2d5834f 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument; +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -37,13 +37,18 @@ public class SimulateProcessorResultTests extends ESTestCase { public void testSerialization() throws IOException { String processorTag = randomAsciiOfLengthBetween(1, 10); - boolean isFailure = randomBoolean(); + boolean isSuccessful = randomBoolean(); + boolean isIgnoredException = randomBoolean(); SimulateProcessorResult simulateProcessorResult; - if (isFailure) { - simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); - } else { + if (isSuccessful) { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument); + if (isIgnoredException) { + simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test")); + } else { + simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument); + } + } else { + simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); } BytesStreamOutput out = new BytesStreamOutput(); @@ -51,13 +56,20 @@ public class SimulateProcessorResultTests extends ESTestCase { StreamInput streamInput = out.bytes().streamInput(); SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput); assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag())); - if (isFailure) { - assertThat(simulateProcessorResult.getIngestDocument(), is(nullValue())); + if (isSuccessful) { + assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument()); + if (isIgnoredException) { + assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class)); + IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure(); + assertThat(e.getMessage(), equalTo("test")); + } else { + assertThat(otherSimulateProcessorResult.getFailure(), nullValue()); + } + } else { + assertThat(otherSimulateProcessorResult.getIngestDocument(), is(nullValue())); assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class)); IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure(); assertThat(e.getMessage(), equalTo("test")); - } else { - assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument()); } } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java index 5b0a059909..3572a04529 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java @@ -39,6 +39,7 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TY import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class TrackingResultProcessorTests extends ESTestCase { @@ -110,7 +111,7 @@ public class TrackingResultProcessorTests extends ESTestCase { assertThat(resultList.get(0).getFailure(), equalTo(exception)); assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - Map<String, String> metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); @@ -142,7 +143,7 @@ public class TrackingResultProcessorTests extends ESTestCase { assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getFailure(), sameInstance(exception)); assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java b/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java index b4908846e9..bc72094558 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.Map; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; -import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument; +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -47,7 +47,7 @@ public class WriteableIngestDocumentTests extends ESTestCase { for (int i = 0; i < numFields; i++) { sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10)); } - Map<String, String> ingestMetadata = new HashMap<>(); + Map<String, Object> ingestMetadata = new HashMap<>(); numFields = randomIntBetween(1, 5); for (int i = 0; i < numFields; i++) { ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10)); @@ -70,7 +70,7 @@ public class WriteableIngestDocumentTests extends ESTestCase { changed = true; } - Map<String, String> otherIngestMetadata; + Map<String, Object> otherIngestMetadata; if (randomBoolean()) { otherIngestMetadata = new HashMap<>(); numFields = randomIntBetween(1, 5); @@ -103,7 +103,7 @@ public class WriteableIngestDocumentTests extends ESTestCase { for (int i = 0; i < numFields; i++) { sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10)); } - Map<String, String> ingestMetadata = new HashMap<>(); + Map<String, Object> ingestMetadata = new HashMap<>(); numFields = randomIntBetween(1, 5); for (int i = 0; i < numFields; i++) { ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10)); @@ -131,7 +131,7 @@ public class WriteableIngestDocumentTests extends ESTestCase { Map<String, Object> toXContentDoc = (Map<String, Object>) toXContentMap.get("doc"); Map<String, Object> toXContentSource = (Map<String, Object>) toXContentDoc.get("_source"); - Map<String, String> toXContentIngestMetadata = (Map<String, String>) toXContentDoc.get("_ingest"); + Map<String, Object> toXContentIngestMetadata = (Map<String, Object>) toXContentDoc.get("_ingest"); Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata(); for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) { |