diff options
author | Tal Levy <JubBoy333@gmail.com> | 2016-05-24 14:25:40 -0700 |
---|---|---|
committer | Tal Levy <JubBoy333@gmail.com> | 2016-05-24 14:25:40 -0700 |
commit | 0fa67e1538e189ff1b6b46c602ccf0cc6ff22bd1 (patch) | |
tree | 9fb95c0c7ac3d5037b5301ae721255fbe67f5b97 | |
parent | 84dfa360b14c0809a236df0601d7a47b5aa4598f (diff) |
Expose underlying processor to blame for thrown exception within CompoundProcessor (#18342)
Fixes #17823
6 files changed, 119 insertions, 26 deletions
diff --git a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java index 16b3aa10a2..fb10f8a192 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java @@ -20,14 +20,13 @@ package org.elasticsearch.ingest.core; -import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.ElasticsearchException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; /** @@ -94,30 +93,38 @@ public class CompoundProcessor implements Processor { try { processor.execute(ingestDocument); } catch (Exception e) { + ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag()); if (onFailureProcessors.isEmpty()) { - throw e; + throw compoundProcessorException; } else { - executeOnFailure(ingestDocument, e, processor.getType(), processor.getTag()); + executeOnFailure(ingestDocument, compoundProcessorException); } - break; } } } - void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception { + void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { try { - putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag); + putFailureMetadata(ingestDocument, exception); for (Processor processor : onFailureProcessors) { - processor.execute(ingestDocument); + try { + processor.execute(ingestDocument); + } catch (Exception e) { + throw newCompoundProcessorException(e, processor.getType(), processor.getTag()); + } } } finally { removeFailureMetadata(ingestDocument); } } - private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) { + private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { + List<String> processorTypeHeader = cause.getHeader("processor_type"); + List<String> processorTagHeader = cause.getHeader("processor_tag"); + String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null; + String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null; Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); - ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage()); + ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage()); ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType); ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag); } @@ -128,4 +135,21 @@ public class CompoundProcessor implements Processor { ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD); ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); } + + private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) { + if (e instanceof ElasticsearchException && ((ElasticsearchException)e).getHeader("processor_type") != null) { + return (ElasticsearchException) e; + } + + ElasticsearchException exception = new ElasticsearchException(new IllegalArgumentException(e)); + + if (processorType != null) { + exception.addHeader("processor_type", processorType); + } + if (processorTag != null) { + exception.addHeader("processor_tag", processorTag); + } + + return exception; + } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 56e67bb4ad..30c29f8db5 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestProcessor; @@ -167,7 +168,8 @@ public class SimulateExecutionServiceTests extends ESTestCase { SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue()); assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class)); - RuntimeException runtimeException = (RuntimeException) simulateDocumentBaseResult.getFailure(); - assertThat(runtimeException.getMessage(), equalTo("processor failed")); + Exception exception = simulateDocumentBaseResult.getFailure(); + assertThat(exception, instanceOf(ElasticsearchException.class)); + assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed")); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index e5f2049943..111133a052 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -154,7 +155,8 @@ public class IngestClientIT extends ESIntegTestCase { BulkItemResponse itemResponse = response.getItems()[i]; if (i % 2 == 0) { BulkItemResponse.Failure failure = itemResponse.getFailure(); - assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: test processor failed")); + ElasticsearchException compoundProcessorException = (ElasticsearchException) failure.getCause(); + assertThat(compoundProcessorException.getRootCause().getMessage(), equalTo("test processor failed")); } else { IndexResponse indexResponse = itemResponse.getResponse(); assertThat("Expected a successful response but found failure [" + itemResponse.getFailure() + "].", diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 3c0de328c8..254057d2ed 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -188,6 +189,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecuteSuccessWithOnFailure() throws Exception { Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock_processor_type"); + when(processor.getTag()).thenReturn("mock_processor_tag"); Processor onFailureProcessor = mock(Processor.class); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); @@ -198,7 +201,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { @SuppressWarnings("unchecked") Consumer<Boolean> completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(failureHandler, never()).accept(any(RuntimeException.class)); + verify(failureHandler, never()).accept(any(ElasticsearchException.class)); verify(completionHandler, times(1)).accept(true); } diff --git a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java index 7bc8922af4..b4ee7eca07 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/CompoundProcessorTests.java @@ -19,21 +19,17 @@ package org.elasticsearch.ingest.core; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.TestTemplateService; -import org.elasticsearch.ingest.processor.AppendProcessor; -import org.elasticsearch.ingest.processor.SetProcessor; -import org.elasticsearch.ingest.processor.SplitProcessor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; public class CompoundProcessorTests extends ESTestCase { @@ -70,8 +66,8 @@ public class CompoundProcessorTests extends ESTestCase { try { compoundProcessor.execute(ingestDocument); fail("should throw exception"); - } catch (Exception e) { - assertThat(e.getMessage(), equalTo("error")); + } catch (ElasticsearchException e) { + assertThat(e.getRootCause().getMessage(), equalTo("error")); } assertThat(processor.getInvokedCounter(), equalTo(1)); } @@ -117,4 +113,68 @@ public class CompoundProcessorTests extends ESTestCase { assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); } + + public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { + TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); + TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { + Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.entrySet(), hasSize(3)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); + }); + + CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); + + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), + Collections.singletonList(secondProcessor)); + compoundProcessor.execute(ingestDocument); + + assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); + assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + } + + public void testCompoundProcessorExceptionFail() throws Exception { + TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); + TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { + Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.entrySet(), hasSize(3)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); + }); + + CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor), + Collections.singletonList(failProcessor)); + + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), + Collections.singletonList(secondProcessor)); + compoundProcessor.execute(ingestDocument); + + assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); + assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + } + + public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { + TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); + TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { + Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.entrySet(), hasSize(3)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); + }); + + CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor), + Collections.singletonList(new CompoundProcessor(failProcessor))); + + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), + Collections.singletonList(secondProcessor)); + compoundProcessor.execute(ingestDocument); + + assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); + assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java index e53eec56cf..2b53a9d08b 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest.processor; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.core.CompoundProcessor; @@ -73,8 +74,9 @@ public class TrackingResultProcessorTests extends ESTestCase { try { trackingProcessor.execute(ingestDocument); - } catch (Exception e) { - assertThat(e.getMessage(), equalTo(exception.getMessage())); + fail("processor should throw exception"); + } catch (ElasticsearchException e) { + assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); } SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); @@ -121,8 +123,8 @@ public class TrackingResultProcessorTests extends ESTestCase { metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("compound")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("CompoundProcessor-fail-success-success-fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); assertThat(resultList.get(3).getFailure(), nullValue()); assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); } |