summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTal Levy <JubBoy333@gmail.com>2016-05-24 14:25:40 -0700
committerTal Levy <JubBoy333@gmail.com>2016-05-24 14:25:40 -0700
commit0fa67e1538e189ff1b6b46c602ccf0cc6ff22bd1 (patch)
tree9fb95c0c7ac3d5037b5301ae721255fbe67f5b97
parent84dfa360b14c0809a236df0601d7a47b5aa4598f (diff)
Expose underlying processor to blame for thrown exception within CompoundProcessor (#18342)
Fixes #17823
-rw-r--r--core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java44
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java6
-rw-r--r--core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java76
-rw-r--r--core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java10
6 files changed, 119 insertions, 26 deletions
diff --git a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java
index 16b3aa10a2..fb10f8a192 100644
--- a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java
+++ b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java
@@ -20,14 +20,13 @@
package org.elasticsearch.ingest.core;
-import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.ElasticsearchException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -94,30 +93,38 @@ public class CompoundProcessor implements Processor {
try {
processor.execute(ingestDocument);
} catch (Exception e) {
+ ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
- throw e;
+ throw compoundProcessorException;
} else {
- executeOnFailure(ingestDocument, e, processor.getType(), processor.getTag());
+ executeOnFailure(ingestDocument, compoundProcessorException);
}
- break;
}
}
}
- void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception {
+ void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
- putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag);
+ putFailureMetadata(ingestDocument, exception);
for (Processor processor : onFailureProcessors) {
- processor.execute(ingestDocument);
+ try {
+ processor.execute(ingestDocument);
+ } catch (Exception e) {
+ throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
+ }
}
} finally {
removeFailureMetadata(ingestDocument);
}
}
- private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) {
+ private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
+ List<String> processorTypeHeader = cause.getHeader("processor_type");
+ List<String> processorTagHeader = cause.getHeader("processor_tag");
+ String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
+ String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
- ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
+ ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
}
@@ -128,4 +135,21 @@ public class CompoundProcessor implements Processor {
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
}
+
+ private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
+ if (e instanceof ElasticsearchException && ((ElasticsearchException)e).getHeader("processor_type") != null) {
+ return (ElasticsearchException) e;
+ }
+
+ ElasticsearchException exception = new ElasticsearchException(new IllegalArgumentException(e));
+
+ if (processorType != null) {
+ exception.addHeader("processor_type", processorType);
+ }
+ if (processorTag != null) {
+ exception.addHeader("processor_tag", processorTag);
+ }
+
+ return exception;
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java
index 56e67bb4ad..30c29f8db5 100644
--- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.ingest;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor;
@@ -167,7 +168,8 @@ public class SimulateExecutionServiceTests extends ESTestCase {
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
- RuntimeException runtimeException = (RuntimeException) simulateDocumentBaseResult.getFailure();
- assertThat(runtimeException.getMessage(), equalTo("processor failed"));
+ Exception exception = simulateDocumentBaseResult.getFailure();
+ assertThat(exception, instanceOf(ElasticsearchException.class));
+ assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed"));
}
}
diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java
index e5f2049943..111133a052 100644
--- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java
+++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java
@@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -154,7 +155,8 @@ public class IngestClientIT extends ESIntegTestCase {
BulkItemResponse itemResponse = response.getItems()[i];
if (i % 2 == 0) {
BulkItemResponse.Failure failure = itemResponse.getFailure();
- assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: test processor failed"));
+ ElasticsearchException compoundProcessorException = (ElasticsearchException) failure.getCause();
+ assertThat(compoundProcessorException.getRootCause().getMessage(), equalTo("test processor failed"));
} else {
IndexResponse indexResponse = itemResponse.getResponse();
assertThat("Expected a successful response but found failure [" + itemResponse.getFailure() + "].",
diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java
index 3c0de328c8..254057d2ed 100644
--- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -188,6 +189,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteSuccessWithOnFailure() throws Exception {
Processor processor = mock(Processor.class);
+ when(processor.getType()).thenReturn("mock_processor_type");
+ when(processor.getTag()).thenReturn("mock_processor_tag");
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
@@ -198,7 +201,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
- verify(failureHandler, never()).accept(any(RuntimeException.class));
+ verify(failureHandler, never()).accept(any(ElasticsearchException.class));
verify(completionHandler, times(1)).accept(true);
}
diff --git a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java
index 7bc8922af4..b4ee7eca07 100644
--- a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java
+++ b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java
@@ -19,21 +19,17 @@
package org.elasticsearch.ingest.core;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ingest.TestProcessor;
-import org.elasticsearch.ingest.TestTemplateService;
-import org.elasticsearch.ingest.processor.AppendProcessor;
-import org.elasticsearch.ingest.processor.SetProcessor;
-import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
public class CompoundProcessorTests extends ESTestCase {
@@ -70,8 +66,8 @@ public class CompoundProcessorTests extends ESTestCase {
try {
compoundProcessor.execute(ingestDocument);
fail("should throw exception");
- } catch (Exception e) {
- assertThat(e.getMessage(), equalTo("error"));
+ } catch (ElasticsearchException e) {
+ assertThat(e.getRootCause().getMessage(), equalTo("error"));
}
assertThat(processor.getInvokedCounter(), equalTo(1));
}
@@ -117,4 +113,68 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
}
+
+ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
+ TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
+ TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
+ Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+ assertThat(ingestMetadata.entrySet(), hasSize(3));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1"));
+ });
+
+ CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor);
+
+ CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
+ Collections.singletonList(secondProcessor));
+ compoundProcessor.execute(ingestDocument);
+
+ assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
+ assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
+ }
+
+ public void testCompoundProcessorExceptionFail() throws Exception {
+ TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
+ TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
+ TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
+ Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+ assertThat(ingestMetadata.entrySet(), hasSize(3));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
+ });
+
+ CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor),
+ Collections.singletonList(failProcessor));
+
+ CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
+ Collections.singletonList(secondProcessor));
+ compoundProcessor.execute(ingestDocument);
+
+ assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
+ assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
+ }
+
+ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
+ TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
+ TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
+ TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
+ Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+ assertThat(ingestMetadata.entrySet(), hasSize(3));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
+ assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
+ });
+
+ CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor),
+ Collections.singletonList(new CompoundProcessor(failProcessor)));
+
+ CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
+ Collections.singletonList(secondProcessor));
+ compoundProcessor.execute(ingestDocument);
+
+ assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
+ assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java
index e53eec56cf..2b53a9d08b 100644
--- a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java
+++ b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
@@ -73,8 +74,9 @@ public class TrackingResultProcessorTests extends ESTestCase {
try {
trackingProcessor.execute(ingestDocument);
- } catch (Exception e) {
- assertThat(e.getMessage(), equalTo(exception.getMessage()));
+ fail("processor should throw exception");
+ } catch (ElasticsearchException e) {
+ assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage()));
}
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
@@ -121,8 +123,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
metadata = resultList.get(3).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
- assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("compound"));
- assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("CompoundProcessor-fail-success-success-fail"));
+ assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
+ assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}