summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/ingest
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2016-09-28 23:32:06 +0200
committerJason Tedor <jason@tedor.me>2016-09-29 00:22:31 +0200
commit25fd9e26c4fb042d6f6dea14efcd239577bdbbc4 (patch)
tree0fb107a1ef29edb32226f421779d424818493abc /core/src/test/java/org/elasticsearch/action/ingest
parentc809671eb3a4ee113e337bc5f63ce5283017a17a (diff)
parent85402d5220d86ecebf6f2332a9743e717016ce24 (diff)
Merge branch 'master' into feature/seq_no
* master: (1199 commits) [DOCS] Remove non-valid link to mapping migration document Revert "Default `include_in_all` for numeric-like types to false" test: add a test with ipv6 address docs: clearify that both ip4 and ip6 addresses are supported Include complex settings in settings requests Add production warning for pre-release builds Clean up confusing error message on unhandled endpoint [TEST] Increase logging level in testDelayShards() change health from string to enum (#20661) Provide error message when plugin id is missing Document that sliced scroll works for reindex Make reindex-from-remote ignore unknown fields Remove NoopGatewayAllocator in favor of a more realistic mock (#20637) Remove Marvel character reference from guide Fix documentation for setting Java I/O temp dir Update client benchmarks to log4j2 Changes the API of GatewayAllocator#applyStartedShards and (#20642) Removes FailedRerouteAllocation and StartedRerouteAllocation IndexRoutingTable.initializeEmpty shouldn't override supplied primary RecoverySource (#20638) Smoke tester: Adjust to latest changes (#20611) ...
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/ingest')
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java119
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java37
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java31
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java54
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java32
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java10
10 files changed, 205 insertions, 92 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java
index b04533fafc..1316c87e2a 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java
@@ -162,7 +162,7 @@ public class IngestActionFilterTests extends ESTestCase {
PipelineStore store = mock(PipelineStore.class);
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2"));
- when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
+ when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", randomInt(), new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool);
IngestService ingestService = mock(IngestService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
index 3d1a1a1c69..50bd3771bc 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java
@@ -33,29 +33,30 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.transport.TransportException;
+import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.CustomTypeSafeMatcher;
-import org.mockito.stubbing.Answer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -67,7 +68,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
private TransportService transportService;
@SuppressWarnings("unchecked")
- private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes) {
+ private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes, TransportInterceptor interceptor) {
ClusterState.Builder clusterState = new ClusterState.Builder(new ClusterName("_name"));
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder();
DiscoveryNode localNode = null;
@@ -79,7 +80,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
roles.add(DiscoveryNode.Role.INGEST);
}
DiscoveryNode node = new DiscoveryNode(nodeId, nodeId, LocalTransportAddress.buildUnique(), attributes, roles, VersionUtils.randomVersion(random()));
- builder.put(node);
+ builder.add(node);
if (i == totalNodes - 1) {
localNode = node;
}
@@ -88,7 +89,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(localNode);
when(clusterService.state()).thenReturn(clusterState.build());
- transportService = mock(TransportService.class);
+ transportService = new TransportService(Settings.EMPTY, null, null, interceptor);
return new IngestProxyActionFilter(clusterService, transportService);
}
@@ -97,7 +98,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(0, totalNodes);
+ IngestProxyActionFilter filter = buildFilter(0, totalNodes, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action;
ActionRequest request;
@@ -114,7 +115,6 @@ public class IngestProxyActionFilterTests extends ESTestCase {
} catch(IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node."));
}
- verifyZeroInteractions(transportService);
verifyZeroInteractions(actionFilterChain);
verifyZeroInteractions(actionListener);
}
@@ -124,7 +124,8 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action;
ActionRequest request;
@@ -136,7 +137,6 @@ public class IngestProxyActionFilterTests extends ESTestCase {
request = new BulkRequest().add(new IndexRequest());
}
filter.apply(task, action, request, actionListener, actionFilterChain);
- verifyZeroInteractions(transportService);
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
verifyZeroInteractions(actionListener);
}
@@ -147,11 +147,11 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
ActionRequest request = mock(ActionRequest.class);
int totalNodes = randomIntBetween(1, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action = randomAsciiOfLengthBetween(1, 20);
filter.apply(task, action, request, actionListener, actionFilterChain);
- verifyZeroInteractions(transportService);
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
verifyZeroInteractions(actionListener);
}
@@ -162,19 +162,31 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
- Answer<Void> answer = invocationOnMock -> {
- TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
- transportResponseHandler.handleResponse(new IndexResponse());
- return null;
- };
- doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
+ AtomicBoolean run = new AtomicBoolean(false);
+
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
+ new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
+ TransportRequestOptions options,
+ TransportResponseHandler<T> handler) {
+ assertTrue(run.compareAndSet(false, true));
+ assertTrue(node.isIngestNode());
+ assertEquals(action, IndexAction.NAME);
+ handler.handleResponse((T) new IndexResponse());
+ }
+ };
+ }
+ });
IndexRequest indexRequest = new IndexRequest().setPipeline("_id");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
- verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(IndexAction.NAME), same(indexRequest), any(TransportResponseHandler.class));
verifyZeroInteractions(actionFilterChain);
+ assertTrue(run.get());
verify(actionListener).onResponse(any(IndexResponse.class));
verify(actionListener, never()).onFailure(any(TransportException.class));
}
@@ -185,13 +197,24 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
- Answer<Void> answer = invocationOnMock -> {
- TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
- transportResponseHandler.handleResponse(new BulkResponse(null, -1));
- return null;
- };
- doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
+ AtomicBoolean run = new AtomicBoolean(false);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
+ new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
+ TransportRequestOptions options,
+ TransportResponseHandler<T> handler) {
+ assertTrue(run.compareAndSet(false, true));
+ assertTrue(node.isIngestNode());
+ assertEquals(action, BulkAction.NAME);
+ handler.handleResponse((T) new BulkResponse(null, -1));
+ }
+ };
+ }
+ });
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest().setPipeline("_id"));
@@ -200,11 +223,10 @@ public class IngestProxyActionFilterTests extends ESTestCase {
bulkRequest.add(new IndexRequest());
}
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
-
- verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(BulkAction.NAME), same(bulkRequest), any(TransportResponseHandler.class));
verifyZeroInteractions(actionFilterChain);
verify(actionListener).onResponse(any(BulkResponse.class));
verify(actionListener, never()).onFailure(any(TransportException.class));
+ assertTrue(run.get());
}
@SuppressWarnings("unchecked")
@@ -213,30 +235,39 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
- IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
- Answer<Void> answer = invocationOnMock -> {
- TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
- transportResponseHandler.handleException(new TransportException(new IllegalArgumentException()));
- return null;
- };
- doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
-
- String action;
+ String requestAction;
ActionRequest request;
if (randomBoolean()) {
- action = IndexAction.NAME;
+ requestAction = IndexAction.NAME;
request = new IndexRequest().setPipeline("_id");
} else {
- action = BulkAction.NAME;
+ requestAction = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
}
-
- filter.apply(task, action, request, actionListener, actionFilterChain);
-
- verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(action), same(request), any(TransportResponseHandler.class));
+ AtomicBoolean run = new AtomicBoolean(false);
+ IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
+ new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
+ TransportRequestOptions options,
+ TransportResponseHandler<T> handler) {
+ assertTrue(run.compareAndSet(false, true));
+ assertTrue(node.isIngestNode());
+ assertEquals(action, requestAction);
+ handler.handleException(new TransportException(new IllegalArgumentException()));
+ }
+ };
+ }
+ });
+ filter.apply(task, requestAction, request, actionListener, actionFilterChain);
verifyZeroInteractions(actionFilterChain);
verify(actionListener).onFailure(any(TransportException.class));
verify(actionListener, never()).onResponse(any(TransportResponse.class));
+ assertTrue(run.get());
+
}
private static class IngestNodeMatcher extends CustomTypeSafeMatcher<DiscoveryNode> {
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java
index 544e2932b4..83aad26f6a 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java
@@ -27,7 +27,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
-import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument;
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java
index 5806d8c312..a4320d2641 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java
@@ -34,7 +34,7 @@ import org.junit.Before;
import java.util.Collections;
import java.util.Map;
-import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument;
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
@@ -43,6 +43,8 @@ import static org.hamcrest.Matchers.sameInstance;
public class SimulateExecutionServiceTests extends ESTestCase {
+ private final Integer version = randomBoolean() ? randomInt() : null;
+
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private IngestDocument ingestDocument;
@@ -65,7 +67,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
- Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
+ Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@@ -90,7 +92,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
- Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
+ Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
@@ -103,7 +105,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
- Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3));
+ Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
@@ -127,7 +129,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
- Pipeline pipeline = new Pipeline("_id", "_description",
+ Pipeline pipeline = new Pipeline("_id", "_description", version,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
@@ -145,7 +147,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument);
- Map<String, String> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
+ Map<String, Object> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
@@ -160,9 +162,26 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception {
- TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
+ RuntimeException exception = new RuntimeException("processor failed");
+ TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
+ CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
+ Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
+ SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
+ assertThat(testProcessor.getInvokedCounter(), equalTo(1));
+ assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
+ SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
+ assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
+ assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
+ assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
+ assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
+ assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
+ assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
+ }
+
+ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
+ TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
- Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor));
+ Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@@ -177,7 +196,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
- Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
+ Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
assertThat(processor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java
index fa4bdc6525..ab5d30c6f9 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java
@@ -19,19 +19,6 @@
package org.elasticsearch.action.ingest;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.ingest.PipelineStore;
-import org.elasticsearch.ingest.ProcessorsRegistry;
-import org.elasticsearch.ingest.TestProcessor;
-import org.elasticsearch.ingest.TestTemplateService;
-import org.elasticsearch.ingest.CompoundProcessor;
-import org.elasticsearch.ingest.IngestDocument;
-import org.elasticsearch.ingest.Pipeline;
-import org.elasticsearch.ingest.Processor;
-import org.elasticsearch.script.ScriptService;
-import org.elasticsearch.test.ESTestCase;
-import org.junit.Before;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -40,6 +27,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.elasticsearch.ingest.CompoundProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Pipeline;
+import org.elasticsearch.ingest.PipelineStore;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.TestProcessor;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID;
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
@@ -58,13 +54,12 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
- Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
- ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder();
- processorRegistryBuilder.registerProcessor("mock_processor", ((registry) -> mock(Processor.Factory.class)));
- ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(mock(ScriptService.class), mock(ClusterService.class));
+ Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
+ Map<String, Processor.Factory> registry =
+ Collections.singletonMap("mock_processor", (factories, tag, config) -> processor);
store = mock(PipelineStore.class);
when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
- when(store.getProcessorRegistry()).thenReturn(processorRegistry);
+ when(store.getProcessorFactories()).thenReturn(registry);
}
public void testParseUsingPipelineStore() throws Exception {
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java
new file mode 100644
index 0000000000..0966010a8f
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.ingest;
+
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class SimulatePipelineRequestTests extends ESTestCase {
+
+ public void testSerialization() throws IOException {
+ SimulatePipelineRequest request = new SimulatePipelineRequest(new BytesArray(""));
+ // Sometimes we set an id
+ if (randomBoolean()) {
+ request.setId(randomAsciiOfLengthBetween(1, 10));
+ }
+
+ // Sometimes we explicitly set a boolean (with whatever value)
+ if (randomBoolean()) {
+ request.setVerbose(randomBoolean());
+ }
+
+ BytesStreamOutput out = new BytesStreamOutput();
+ request.writeTo(out);
+ StreamInput streamInput = out.bytes().streamInput();
+ SimulatePipelineRequest otherRequest = new SimulatePipelineRequest();
+ otherRequest.readFrom(streamInput);
+
+ assertThat(otherRequest.getId(), equalTo(request.getId()));
+ assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose()));
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java
index 576e8e0172..ad308b01bf 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java
@@ -30,7 +30,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument;
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -39,6 +39,7 @@ public class SimulatePipelineResponseTests extends ESTestCase {
public void testSerialization() throws IOException {
boolean isVerbose = randomBoolean();
+ String id = randomBoolean() ? randomAsciiOfLengthBetween(1, 10) : null;
int numResults = randomIntBetween(1, 10);
List<SimulateDocumentResult> results = new ArrayList<>(numResults);
for (int i = 0; i < numResults; i++) {
@@ -70,7 +71,7 @@ public class SimulatePipelineResponseTests extends ESTestCase {
}
}
- SimulatePipelineResponse response = new SimulatePipelineResponse(randomAsciiOfLengthBetween(1, 10), isVerbose, results);
+ SimulatePipelineResponse response = new SimulatePipelineResponse(id, isVerbose, results);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java
index ccf3a67494..75d2d5834f 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java
@@ -27,7 +27,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
-import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument;
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -37,13 +37,18 @@ public class SimulateProcessorResultTests extends ESTestCase {
public void testSerialization() throws IOException {
String processorTag = randomAsciiOfLengthBetween(1, 10);
- boolean isFailure = randomBoolean();
+ boolean isSuccessful = randomBoolean();
+ boolean isIgnoredException = randomBoolean();
SimulateProcessorResult simulateProcessorResult;
- if (isFailure) {
- simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
- } else {
+ if (isSuccessful) {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
- simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument);
+ if (isIgnoredException) {
+ simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test"));
+ } else {
+ simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument);
+ }
+ } else {
+ simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
}
BytesStreamOutput out = new BytesStreamOutput();
@@ -51,13 +56,20 @@ public class SimulateProcessorResultTests extends ESTestCase {
StreamInput streamInput = out.bytes().streamInput();
SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput);
assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag()));
- if (isFailure) {
- assertThat(simulateProcessorResult.getIngestDocument(), is(nullValue()));
+ if (isSuccessful) {
+ assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument());
+ if (isIgnoredException) {
+ assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));
+ IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure();
+ assertThat(e.getMessage(), equalTo("test"));
+ } else {
+ assertThat(otherSimulateProcessorResult.getFailure(), nullValue());
+ }
+ } else {
+ assertThat(otherSimulateProcessorResult.getIngestDocument(), is(nullValue()));
assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
- } else {
- assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument());
}
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java
index 5b0a059909..3572a04529 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java
@@ -39,6 +39,7 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TY
import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
public class TrackingResultProcessorTests extends ESTestCase {
@@ -110,7 +111,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
- Map<String, String> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
+ Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
@@ -142,7 +143,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
- assertThat(resultList.get(0).getFailure(), nullValue());
+ assertThat(resultList.get(0).getFailure(), sameInstance(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java b/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java
index b4908846e9..bc72094558 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java
@@ -34,7 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
-import static org.elasticsearch.ingest.IngestDocumentTests.assertIngestDocument;
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -47,7 +47,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
for (int i = 0; i < numFields; i++) {
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
}
- Map<String, String> ingestMetadata = new HashMap<>();
+ Map<String, Object> ingestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@@ -70,7 +70,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
changed = true;
}
- Map<String, String> otherIngestMetadata;
+ Map<String, Object> otherIngestMetadata;
if (randomBoolean()) {
otherIngestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
@@ -103,7 +103,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
for (int i = 0; i < numFields; i++) {
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
}
- Map<String, String> ingestMetadata = new HashMap<>();
+ Map<String, Object> ingestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@@ -131,7 +131,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
Map<String, Object> toXContentDoc = (Map<String, Object>) toXContentMap.get("doc");
Map<String, Object> toXContentSource = (Map<String, Object>) toXContentDoc.get("_source");
- Map<String, String> toXContentIngestMetadata = (Map<String, String>) toXContentDoc.get("_ingest");
+ Map<String, Object> toXContentIngestMetadata = (Map<String, Object>) toXContentDoc.get("_ingest");
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {