diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java | 40 |
1 files changed, 26 insertions, 14 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 0026064489..f008bf9a4e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -69,14 +70,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite } /** - * Creates a bulk request caused by some other request, which is provided as an - * argument so that its headers and context can be copied to the new request - */ - public BulkRequest(ActionRequest<?> request) { - super(request); - } - - /** * Adds a list of requests to be executed. Either index or delete requests. */ public BulkRequest add(ActionRequest<?>... requests) { @@ -253,17 +246,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite * Adds a framed data in binary format */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { - return add(data, defaultIndex, defaultType, null, null, null, true); + return add(data, defaultIndex, defaultType, null, null, null, null, true); } /** * Adds a framed data in binary format */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex) throws Exception { - return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex); + return add(data, defaultIndex, defaultType, null, null, null, null, allowExplicitIndex); } - public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable Object payload, boolean allowExplicitIndex) throws Exception { + public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable String defaultPipeline, @Nullable Object payload, boolean allowExplicitIndex) throws Exception { XContent xContent = XContentFactory.xContent(data); int line = 0; int from = 0; @@ -304,6 +297,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite long version = Versions.MATCH_ANY; VersionType versionType = VersionType.INTERNAL; int retryOnConflict = 0; + String pipeline = defaultPipeline; // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters @@ -344,6 +338,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite versionType = VersionType.fromString(parser.text()); } else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) { retryOnConflict = parser.intValue(); + } else if ("pipeline".equals(currentFieldName)) { + pipeline = parser.text(); } else if ("fields".equals(currentFieldName)) { throw new IllegalArgumentException("Action/metadata line [" + line + "] contains a simple value for parameter [fields] while a list is expected"); } else { @@ -380,15 +376,15 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite if ("index".equals(action)) { if (opType == null) { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) - .source(data.slice(from, nextMarker - from)), payload); + .setPipeline(pipeline).source(data.slice(from, nextMarker - from)), payload); } else { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) - .create("create".equals(opType)) + .create("create".equals(opType)).setPipeline(pipeline) .source(data.slice(from, nextMarker - from)), payload); } } else if ("create".equals(action)) { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) - .create(true) + .create(true).setPipeline(pipeline) .source(data.slice(from, nextMarker - from)), payload); } else if ("update".equals(action)) { UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict) @@ -479,6 +475,22 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite return -1; } + /** + * @return Whether this bulk request contains index request with an ingest pipeline enabled. + */ + public boolean hasIndexRequestsWithPipelines() { + for (ActionRequest actionRequest : requests) { + if (actionRequest instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) actionRequest; + if (Strings.hasText(indexRequest.getPipeline())) { + return true; + } + } + } + + return false; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; |