summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java')
-rw-r--r--core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java165
1 files changed, 165 insertions, 0 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java
new file mode 100644
index 0000000000..66c2a0183e
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java
@@ -0,0 +1,165 @@
+package org.elasticsearch.action.ingest;
+
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.mock;
+
+public class BulkRequestModifierTests extends ESTestCase {
+
+ public void testBulkRequestModifier() {
+ int numRequests = scaledRandomIntBetween(8, 64);
+ BulkRequest bulkRequest = new BulkRequest();
+ for (int i = 0; i < numRequests; i++) {
+ bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}"));
+ }
+ CaptureActionListener actionListener = new CaptureActionListener();
+ IngestActionFilter.BulkRequestModifier bulkRequestModifier = new IngestActionFilter.BulkRequestModifier(bulkRequest);
+
+ int i = 0;
+ Set<Integer> failedSlots = new HashSet<>();
+ while (bulkRequestModifier.hasNext()) {
+ bulkRequestModifier.next();
+ if (randomBoolean()) {
+ bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException());
+ failedSlots.add(i);
+ }
+ i++;
+ }
+
+ assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
+ // simulate that we actually executed the modified bulk request:
+ ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener);
+ result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
+
+ BulkResponse bulkResponse = actionListener.getResponse();
+ for (int j = 0; j < bulkResponse.getItems().length; j++) {
+ if (failedSlots.contains(j)) {
+ BulkItemResponse item = bulkResponse.getItems()[j];
+ assertThat(item.isFailed(), is(true));
+ assertThat(item.getFailure().getIndex(), equalTo("_index"));
+ assertThat(item.getFailure().getType(), equalTo("_type"));
+ assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
+ assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
+ } else {
+ assertThat(bulkResponse.getItems()[j], nullValue());
+ }
+ }
+ }
+
+ public void testPipelineFailures() {
+ BulkRequest originalBulkRequest = new BulkRequest();
+ for (int i = 0; i < 32; i++) {
+ originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
+ }
+
+ IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
+ for (int i = 0; modifier.hasNext(); i++) {
+ modifier.next();
+ if (i % 2 == 0) {
+ modifier.markCurrentItemAsFailed(new RuntimeException());
+ }
+ }
+
+ // So half of the requests have "failed", so only the successful requests are left:
+ BulkRequest bulkRequest = modifier.getBulkRequest();
+ assertThat(bulkRequest.requests().size(), Matchers.equalTo(16));
+
+ List<BulkItemResponse> responses = new ArrayList<>();
+ ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(new ActionListener<BulkResponse>() {
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {
+ responses.addAll(Arrays.asList(bulkItemResponses.getItems()));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ }
+ });
+
+ List<BulkItemResponse> originalResponses = new ArrayList<>();
+ for (ActionRequest actionRequest : bulkRequest.requests()) {
+ IndexRequest indexRequest = (IndexRequest) actionRequest;
+ IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(), indexRequest.id(), 1, true);
+ originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse));
+ }
+ bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0));
+
+ assertThat(responses.size(), Matchers.equalTo(32));
+ for (int i = 0; i < 32; i++) {
+ assertThat(responses.get(i).getId(), Matchers.equalTo(String.valueOf(i)));
+ }
+ }
+
+ public void testNoFailures() {
+ BulkRequest originalBulkRequest = new BulkRequest();
+ for (int i = 0; i < 32; i++) {
+ originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
+ }
+
+ IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
+ while (modifier.hasNext()) {
+ modifier.next();
+ }
+
+ BulkRequest bulkRequest = modifier.getBulkRequest();
+ assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
+ @SuppressWarnings("unchecked")
+ ActionListener<BulkResponse> actionListener = mock(ActionListener.class);
+ assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener));
+ }
+
+ private static class CaptureActionListener implements ActionListener<BulkResponse> {
+
+ private BulkResponse response;
+
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {
+ this.response = bulkItemResponses;
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ }
+
+ public BulkResponse getResponse() {
+ return response;
+ }
+ }
+}