summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java40
1 files changed, 26 insertions, 14 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
index 0026064489..f008bf9a4e 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
@@ -28,6 +28,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -69,14 +70,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
}
/**
- * Creates a bulk request caused by some other request, which is provided as an
- * argument so that its headers and context can be copied to the new request
- */
- public BulkRequest(ActionRequest<?> request) {
- super(request);
- }
-
- /**
* Adds a list of requests to be executed. Either index or delete requests.
*/
public BulkRequest add(ActionRequest<?>... requests) {
@@ -253,17 +246,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
- return add(data, defaultIndex, defaultType, null, null, null, true);
+ return add(data, defaultIndex, defaultType, null, null, null, null, true);
}
/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex) throws Exception {
- return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex);
+ return add(data, defaultIndex, defaultType, null, null, null, null, allowExplicitIndex);
}
- public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
+ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable String defaultPipeline, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
XContent xContent = XContentFactory.xContent(data);
int line = 0;
int from = 0;
@@ -304,6 +297,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
int retryOnConflict = 0;
+ String pipeline = defaultPipeline;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
@@ -344,6 +338,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
versionType = VersionType.fromString(parser.text());
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
+ } else if ("pipeline".equals(currentFieldName)) {
+ pipeline = parser.text();
} else if ("fields".equals(currentFieldName)) {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains a simple value for parameter [fields] while a list is expected");
} else {
@@ -380,15 +376,15 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
- .source(data.slice(from, nextMarker - from)), payload);
+ .setPipeline(pipeline).source(data.slice(from, nextMarker - from)), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
- .create("create".equals(opType))
+ .create("create".equals(opType)).setPipeline(pipeline)
.source(data.slice(from, nextMarker - from)), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
- .create(true)
+ .create(true).setPipeline(pipeline)
.source(data.slice(from, nextMarker - from)), payload);
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
@@ -479,6 +475,22 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
return -1;
}
+ /**
+ * @return Whether this bulk request contains index request with an ingest pipeline enabled.
+ */
+ public boolean hasIndexRequestsWithPipelines() {
+ for (ActionRequest actionRequest : requests) {
+ if (actionRequest instanceof IndexRequest) {
+ IndexRequest indexRequest = (IndexRequest) actionRequest;
+ if (Strings.hasText(indexRequest.getPipeline())) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;