diff options
author | Areek Zillur <areek.zillur@elasticsearch.com> | 2016-08-22 13:08:49 -0400 |
---|---|---|
committer | Areek Zillur <areek.zillur@elasticsearch.com> | 2016-08-23 10:33:37 -0400 |
commit | 80ca78479f5046f58374ed831f6b2c3d51f530f9 (patch) | |
tree | 88e15ab5cb56c2b629d9484e08e99d9f0e8daa4a /core | |
parent | d4dec26aa00ced5ead648881301a035272210288 (diff) |
Make bulk item-level requests implement DocumentRequest interface
Currently, bulk item requests can be any ActionRequest, this commit
restricts bulk item requests to DocumentRequest. This simplifies
handling failures during bulk requests. Additionally, a new enum
is added to DocumentRequest to represent the intended operation
to be performed by a document request. Now, index operation type
also uses the new enum to specify whether the request should
create or index a document.
Diffstat (limited to 'core')
25 files changed, 304 insertions, 344 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java index a90f013a6b..50af0dc780 100644 --- a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java +++ b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java @@ -19,11 +19,13 @@ package org.elasticsearch.action; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.index.VersionType; + +import java.util.Locale; /** - * Generic interface to group ActionRequest, which work on single document level - * - * Forces this class return index/type/id getters + * Generic interface to group ActionRequest, which perform writes to a single document + * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} */ public interface DocumentRequest<T> extends IndicesRequest { @@ -70,4 +72,78 @@ public interface DocumentRequest<T> extends IndicesRequest { */ String parent(); + /** + * Get the document version for this request + * @return the document version + */ + long version(); + + /** + * Sets the version, which will perform the operation only if a matching + * version exists and no changes happened on the doc since then. + */ + T version(long version); + + /** + * Get the document version type for this request + * @return the document version type + */ + VersionType versionType(); + + /** + * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. + */ + T versionType(VersionType versionType); + + /** + * Get the requested document operation type of the request + * @return the operation type {@link OpType} + */ + OpType opType(); + + /** + * Requested operation type to perform on the document + */ + enum OpType { + /** + * Creates the resource. Simply adds it to the index, if there is an existing + * document with the id, then it won't be removed. + */ + CREATE(0), + /** + * Index the source. If there an existing document with the id, it will + * be replaced. + */ + INDEX(1), + /** Updates a document */ + UPDATE(2), + /** Deletes a document */ + DELETE(3); + + private final byte op; + private final String lowercase; + + OpType(int op) { + this.op = (byte) op; + this.lowercase = this.toString().toLowerCase(Locale.ENGLISH); + } + + public byte getId() { + return op; + } + + public String getLowercase() { + return lowercase; + } + + public static OpType fromId(byte id) { + switch (id) { + case 0: return CREATE; + case 1: return INDEX; + case 2: return UPDATE; + case 3: return DELETE; + default: throw new IllegalArgumentException("Unknown opType: [" + id + "]"); + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 760c5781ae..79503fcf9e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -19,8 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -36,7 +35,7 @@ import java.io.IOException; public class BulkItemRequest implements Streamable { private int id; - private ActionRequest request; + private DocumentRequest<?> request; private volatile BulkItemResponse primaryResponse; private volatile boolean ignoreOnReplica; @@ -44,8 +43,7 @@ public class BulkItemRequest implements Streamable { } - public BulkItemRequest(int id, ActionRequest request) { - assert request instanceof IndicesRequest; + public BulkItemRequest(int id, DocumentRequest<?> request) { this.id = id; this.request = request; } @@ -54,14 +52,13 @@ public class BulkItemRequest implements Streamable { return id; } - public ActionRequest request() { + public DocumentRequest<?> request() { return request; } public String index() { - IndicesRequest indicesRequest = (IndicesRequest) request; - assert indicesRequest.indices().length == 1; - return indicesRequest.indices()[0]; + assert request.indices().length == 1; + return request.indices()[0]; } BulkItemResponse getPrimaryResponse() { @@ -94,13 +91,18 @@ public class BulkItemRequest implements Streamable { id = in.readVInt(); byte type = in.readByte(); if (type == 0) { - request = new IndexRequest(); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.readFrom(in); + request = indexRequest; } else if (type == 1) { - request = new DeleteRequest(); + DeleteRequest deleteRequest = new DeleteRequest(); + deleteRequest.readFrom(in); + request = deleteRequest; } else if (type == 2) { - request = new UpdateRequest(); + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.readFrom(in); + request = updateRequest; } - request.readFrom(in); if (in.readBoolean()) { primaryResponse = BulkItemResponse.readBulkItem(in); } @@ -112,12 +114,14 @@ public class BulkItemRequest implements Streamable { out.writeVInt(id); if (request instanceof IndexRequest) { out.writeByte((byte) 0); + ((IndexRequest) request).writeTo(out); } else if (request instanceof DeleteRequest) { out.writeByte((byte) 1); + ((DeleteRequest) request).writeTo(out); } else if (request instanceof UpdateRequest) { out.writeByte((byte) 2); + ((UpdateRequest) request).writeTo(out); } - request.writeTo(out); out.writeOptionalStreamable(primaryResponse); out.writeBoolean(ignoreOnReplica); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index ad45ace84c..adeda64ee5 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocumentRequest.OpType; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; @@ -50,7 +51,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(opType); + builder.startObject(opType.getLowercase()); if (failure == null) { response.toXContent(builder, params); builder.field(Fields.STATUS, response.status().getStatus()); @@ -183,7 +184,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent { private int id; - private String opType; + private OpType opType; private DocWriteResponse response; @@ -193,13 +194,13 @@ public class BulkItemResponse implements Streamable, StatusToXContent { } - public BulkItemResponse(int id, String opType, DocWriteResponse response) { + public BulkItemResponse(int id, OpType opType, DocWriteResponse response) { this.id = id; - this.opType = opType; this.response = response; + this.opType = opType; } - public BulkItemResponse(int id, String opType, Failure failure) { + public BulkItemResponse(int id, OpType opType, Failure failure) { this.id = id; this.opType = opType; this.failure = failure; @@ -215,7 +216,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent { /** * The operation type ("index", "create" or "delete"). */ - public String getOpType() { + public OpType getOpType() { return this.opType; } @@ -300,7 +301,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); - opType = in.readString(); + opType = OpType.fromId(in.readByte()); byte type = in.readByte(); if (type == 0) { @@ -322,7 +323,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - out.writeString(opType); + out.writeByte(opType.getId()); if (response == null) { out.writeByte((byte) 2); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index c54b3588c1..4881a9444b 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; @@ -250,24 +250,24 @@ public class BulkProcessor implements Closeable { * (for example, if no id is provided, one will be generated, or usage of the create flag). */ public BulkProcessor add(IndexRequest request) { - return add((ActionRequest<?>) request); + return add((DocumentRequest<?>) request); } /** * Adds an {@link DeleteRequest} to the list of actions to execute. */ public BulkProcessor add(DeleteRequest request) { - return add((ActionRequest<?>) request); + return add((DocumentRequest<?>) request); } /** * Adds either a delete or an index request. */ - public BulkProcessor add(ActionRequest<?> request) { + public BulkProcessor add(DocumentRequest<?> request) { return add(request, null); } - public BulkProcessor add(ActionRequest<?> request, @Nullable Object payload) { + public BulkProcessor add(DocumentRequest<?> request, @Nullable Object payload) { internalAdd(request, payload); return this; } @@ -282,7 +282,7 @@ public class BulkProcessor implements Closeable { } } - private synchronized void internalAdd(ActionRequest<?> request, @Nullable Object payload) { + private synchronized void internalAdd(DocumentRequest<?> request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 7e7aa4ce60..538dfc4c3a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -46,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -65,7 +67,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite * {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare * the one with the least casts. */ - final List<ActionRequest<?>> requests = new ArrayList<>(); + final List<DocumentRequest<?>> requests = new ArrayList<>(); List<Object> payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; @@ -80,14 +82,14 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(ActionRequest<?>... requests) { - for (ActionRequest<?> request : requests) { + public BulkRequest add(DocumentRequest<?>... requests) { + for (DocumentRequest<?> request : requests) { add(request, null); } return this; } - public BulkRequest add(ActionRequest<?> request) { + public BulkRequest add(DocumentRequest<?> request) { return add(request, null); } @@ -97,7 +99,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite * @param payload Optional payload * @return the current bulk request */ - public BulkRequest add(ActionRequest<?> request, @Nullable Object payload) { + public BulkRequest add(DocumentRequest<?> request, @Nullable Object payload) { if (request instanceof IndexRequest) { add((IndexRequest) request, payload); } else if (request instanceof DeleteRequest) { @@ -113,8 +115,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(Iterable<ActionRequest<?>> requests) { - for (ActionRequest<?> request : requests) { + public BulkRequest add(Iterable<DocumentRequest<?>> requests) { + for (DocumentRequest<?> request : requests) { add(request); } return this; @@ -200,18 +202,13 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite /** * The list of requests in this bulk request. */ - public List<ActionRequest<?>> requests() { + public List<DocumentRequest<?>> requests() { return this.requests; } @Override public List<? extends IndicesRequest> subRequests() { - List<IndicesRequest> indicesRequests = new ArrayList<>(); - for (ActionRequest<?> request : requests) { - assert request instanceof IndicesRequest; - indicesRequests.add((IndicesRequest) request); - } - return indicesRequests; + return requests.stream().collect(Collectors.toList()); } /** @@ -497,7 +494,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite * @return Whether this bulk request contains index request with an ingest pipeline enabled. */ public boolean hasIndexRequestsWithPipelines() { - for (ActionRequest<?> actionRequest : requests) { + for (DocumentRequest<?> actionRequest : requests) { if (actionRequest instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) actionRequest; if (Strings.hasText(indexRequest.getPipeline())) { @@ -515,13 +512,13 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite if (requests.isEmpty()) { validationException = addValidationError("no requests added", validationException); } - for (ActionRequest<?> request : requests) { + for (DocumentRequest<?> request : requests) { // We first check if refresh has been set if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) { validationException = addValidationError( "RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException); } - ActionRequestValidationException ex = request.validate(); + ActionRequestValidationException ex = ((WriteRequest<?>) request).validate(); if (ex != null) { if (validationException == null) { validationException = new ActionRequestValidationException(); @@ -563,15 +560,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite super.writeTo(out); waitForActiveShards.writeTo(out); out.writeVInt(requests.size()); - for (ActionRequest<?> request : requests) { + for (DocumentRequest<?> request : requests) { if (request instanceof IndexRequest) { out.writeByte((byte) 0); + ((IndexRequest) request).writeTo(out); } else if (request instanceof DeleteRequest) { out.writeByte((byte) 1); + ((DeleteRequest) request).writeTo(out); } else if (request instanceof UpdateRequest) { out.writeByte((byte) 2); + ((UpdateRequest) request).writeTo(out); } - request.writeTo(out); } refreshPolicy.writeTo(out); timeout.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index da080b54b2..f7861d1e09 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,11 +19,9 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -58,18 +56,18 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** - * + * Groups bulk request items by shard, optionally creating non-existent indices and + * delegates to {@link TransportShardBulkAction} for shard-level bulk execution */ public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> { @@ -119,15 +117,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul if (needToCheck()) { // Keep track of all unique indices and all unique types per index for the create index requests: - final Set<String> autoCreateIndices = new HashSet<>(); - for (ActionRequest request : bulkRequest.requests) { - if (request instanceof DocumentRequest) { - DocumentRequest req = (DocumentRequest) request; - autoCreateIndices.add(req.index()); - } else { - throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); - } - } + final Set<String> autoCreateIndices = bulkRequest.requests.stream() + .map(DocumentRequest::index) + .collect(Collectors.toSet()); final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); ClusterState state = clusterService.state(); for (String index : autoCreateIndices) { @@ -153,7 +145,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException)) { // fail all requests involving this index, if create didnt work for (int i = 0; i < bulkRequest.requests.size(); i++) { - ActionRequest request = bulkRequest.requests.get(i); + DocumentRequest<?> request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { bulkRequest.requests.set(i, null); } @@ -188,27 +180,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul return autoCreateIndex.shouldAutoCreate(index, state); } - private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Exception e) { - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - if (index.equals(indexRequest.index())) { - responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e))); - return true; - } - } else if (request instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request; - if (index.equals(deleteRequest.index())) { - responses.set(idx, new BulkItemResponse(idx, "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e))); - return true; - } - } else if (request instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request; - if (index.equals(updateRequest.index())) { - responses.set(idx, new BulkItemResponse(idx, "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e))); - return true; - } - } else { - throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); + private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocumentRequest<?> request, String index, Exception e) { + if (index.equals(request.index())) { + responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e))); + return true; } return false; } @@ -236,95 +211,56 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); MetaData metaData = clusterState.metaData(); for (int i = 0; i < bulkRequest.requests.size(); i++) { - ActionRequest request = bulkRequest.requests.get(i); + DocumentRequest<?> documentRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored - if (request == null) { + if (documentRequest == null) { continue; } - DocumentRequest documentRequest = (DocumentRequest) request; if (addFailureIfIndexIsUnavailable(documentRequest, bulkRequest, responses, i, concreteIndices, metaData)) { continue; } Index concreteIndex = concreteIndices.resolveIfAbsent(documentRequest); - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - MappingMetaData mappingMd = null; - final IndexMetaData indexMetaData = metaData.index(concreteIndex); - if (indexMetaData != null) { - mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); - } - try { - indexRequest.resolveRouting(metaData); - indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); - } catch (ElasticsearchParseException | RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), indexRequest.type(), indexRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); - } - } else if (request instanceof DeleteRequest) { - try { - TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)request); - } catch(RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "delete", failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); - } - - } else if (request instanceof UpdateRequest) { - try { - TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)request); - } catch(RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "update", failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); + try { + switch (documentRequest.opType()) { + case CREATE: + case INDEX: + IndexRequest indexRequest = (IndexRequest) documentRequest; + MappingMetaData mappingMd = null; + final IndexMetaData indexMetaData = metaData.index(concreteIndex); + if (indexMetaData != null) { + mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); + } + indexRequest.resolveRouting(metaData); + indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); + break; + case UPDATE: + TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)documentRequest); + break; + case DELETE: + TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)documentRequest); + break; + default: throw new AssertionError("request type not supported: [" + documentRequest.opType() + "]"); } - } else { - throw new AssertionError("request type not supported: [" + request.getClass().getName() + "]"); + } catch (ElasticsearchParseException | RoutingMissingException e) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e); + BulkItemResponse bulkItemResponse = new BulkItemResponse(i, documentRequest.opType(), failure); + responses.set(i, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(i, null); } } // first, go over all the requests and create a ShardId -> Operations mapping Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - ActionRequest request = bulkRequest.requests.get(i); - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - String concreteIndex = concreteIndices.getConcreteIndex(indexRequest.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, indexRequest.id(), indexRequest.routing()).shardId(); - List<BulkItemRequest> list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); - } else if (request instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request; - String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.id(), deleteRequest.routing()).shardId(); - List<BulkItemRequest> list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); - } else if (request instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request; - String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.id(), updateRequest.routing()).shardId(); - List<BulkItemRequest> list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); + DocumentRequest<?> request = bulkRequest.requests.get(i); + if (request == null) { + continue; } + String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); + ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); + List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); + shardRequests.add(new BulkItemRequest(i, request)); } if (requestsByShard.isEmpty()) { @@ -364,19 +300,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul // create failures for all relevant requests for (BulkItemRequest request : requests) { final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - if (request.request() instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH), - new BulkItemResponse.Failure(indexName, indexRequest.type(), indexRequest.id(), e))); - } else if (request.request() instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), "delete", - new BulkItemResponse.Failure(indexName, deleteRequest.type(), deleteRequest.id(), e))); - } else if (request.request() instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), "update", - new BulkItemResponse.Failure(indexName, updateRequest.type(), updateRequest.id(), e))); - } + DocumentRequest<?> documentRequest = request.request(); + responses.set(request.id(), new BulkItemResponse(request.id(), documentRequest.opType(), + new BulkItemResponse.Failure(indexName, documentRequest.type(), documentRequest.id(), e))); } if (counter.decrementAndGet() == 0) { finishHim(); @@ -413,15 +339,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul if (unavailableException != null) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), unavailableException); - String operationType = "unknown"; - if (request instanceof IndexRequest) { - operationType = "index"; - } else if (request instanceof DeleteRequest) { - operationType = "delete"; - } else if (request instanceof UpdateRequest) { - operationType = "update"; - } - BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, operationType, failure); + BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure); responses.set(idx, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(idx, null); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 745449c0a7..84a5197ea9 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -71,9 +71,6 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation. */ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> { - private static final String OP_TYPE_UPDATE = "update"; - private static final String OP_TYPE_DELETE = "delete"; - public static final String ACTION_NAME = BulkAction.NAME + "[s]"; private final UpdateHelper updateHelper; @@ -157,7 +154,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ location = locationToSync(location, result.getLocation()); // add the response IndexResponse indexResponse = result.getResponse(); - setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse)); + setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType(), indexResponse)); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { @@ -174,7 +171,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ if (item.getPrimaryResponse() != null && isConflictException(e)) { setResponse(item, item.getPrimaryResponse()); } else { - setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), + setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType(), new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e))); } } @@ -199,7 +196,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ final WriteResult<DeleteResponse> writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); DeleteResponse deleteResponse = writeResult.getResponse(); location = locationToSync(location, writeResult.getLocation()); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse)); + setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(), deleteResponse)); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { @@ -216,7 +213,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ if (item.getPrimaryResponse() != null && isConflictException(e)) { setResponse(item, item.getPrimaryResponse()); } else { - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, + setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(), new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e))); } } @@ -254,7 +251,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); } item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse)); + setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResponse)); break; case DELETED: @SuppressWarnings("unchecked") @@ -265,10 +262,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); // Replace the update request to the translated delete request to execute on the replica. item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse)); + setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResponse)); break; case NOOP: - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult)); + setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResult.noopResult)); item.setIgnoreOnReplica(); // no need to go to the replica break; default: @@ -281,7 +278,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ if (updateResult.retry) { // updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration if (updateAttemptsCount >= updateRequest.retryOnConflict()) { - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, + setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e))); } } else { @@ -299,20 +296,20 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ if (item.getPrimaryResponse() != null && isConflictException(e)) { setResponse(item, item.getPrimaryResponse()); } else if (updateResult.result == null) { - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e))); + setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e))); } else { switch (updateResult.result.getResponseResult()) { case CREATED: case UPDATED: IndexRequest indexRequest = updateResult.request(); logFailure(e, "index", request.shardId(), indexRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, + setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e))); break; case DELETED: DeleteRequest deleteRequest = updateResult.request(); logFailure(e, "delete", request.shardId(), deleteRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, + setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(), new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e))); break; default: diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index bdf09e3e53..e3babcfc38 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -164,29 +164,34 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> impleme return this.routing; } - /** - * Sets the version, which will cause the delete operation to only be performed if a matching - * version exists and no changes happened on the doc since then. - */ + @Override public DeleteRequest version(long version) { this.version = version; return this; } + @Override public long version() { return this.version; } + @Override public DeleteRequest versionType(VersionType versionType) { this.versionType = versionType; return this; } + @Override public VersionType versionType() { return this.versionType; } @Override + public OpType opType() { + return OpType.DELETE; + } + + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); type = in.readString(); diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 63ede68b9f..910abf8728 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -69,67 +69,6 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; */ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocumentRequest<IndexRequest> { - /** - * Operation type controls if the type of the index operation. - */ - public static enum OpType { - /** - * Index the source. If there an existing document with the id, it will - * be replaced. - */ - INDEX((byte) 0), - /** - * Creates the resource. Simply adds it to the index, if there is an existing - * document with the id, then it won't be removed. - */ - CREATE((byte) 1); - - private final byte id; - private final String lowercase; - - OpType(byte id) { - this.id = id; - this.lowercase = this.toString().toLowerCase(Locale.ENGLISH); - } - - /** - * The internal representation of the operation type. - */ - public byte id() { - return id; - } - - public String lowercase() { - return this.lowercase; - } - - /** - * Constructs the operation type from its internal representation. - */ - public static OpType fromId(byte id) { - if (id == 0) { - return INDEX; - } else if (id == 1) { - return CREATE; - } else { - throw new IllegalArgumentException("No type match for [" + id + "]"); - } - } - - public static OpType fromString(String sOpType) { - String lowersOpType = sOpType.toLowerCase(Locale.ROOT); - switch (lowersOpType) { - case "create": - return OpType.CREATE; - case "index": - return OpType.INDEX; - default: - throw new IllegalArgumentException("opType [" + sOpType + "] not allowed, either [index] or [create] are allowed"); - } - } - - } - private String type; private String id; @Nullable @@ -506,6 +445,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement * Sets the type of operation to perform. */ public IndexRequest opType(OpType opType) { + if (opType != OpType.CREATE && opType != OpType.INDEX) { + throw new IllegalArgumentException("opType must be 'create' or 'index', found: [" + opType + "]"); + } this.opType = opType; if (opType == OpType.CREATE) { version(Versions.MATCH_DELETED); @@ -515,11 +457,19 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement } /** - * Sets a string representation of the {@link #opType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can + * Sets a string representation of the {@link #opType(OpType)}. Can * be either "index" or "create". */ public IndexRequest opType(String opType) { - return opType(OpType.fromString(opType)); + String op = opType.toLowerCase(Locale.ROOT); + if (op.equals("create")) { + opType(OpType.CREATE); + } else if (op.equals("index")) { + opType(OpType.INDEX); + } else { + throw new IllegalArgumentException("opType must be 'create' or 'index', found: [" + opType + "]"); + } + return this; } @@ -534,34 +484,29 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement } } - /** - * The type of operation to perform. - */ + @Override public OpType opType() { return this.opType; } - /** - * Sets the version, which will cause the index operation to only be performed if a matching - * version exists and no changes happened on the doc since then. - */ + @Override public IndexRequest version(long version) { this.version = version; return this; } + @Override public long version() { return this.version; } - /** - * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. - */ + @Override public IndexRequest versionType(VersionType versionType) { this.versionType = versionType; return this; } + @Override public VersionType versionType() { return this.versionType; } @@ -651,7 +596,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement out.writeOptionalString(timestamp); out.writeOptionalWriteable(ttl); out.writeBytesReference(source); - out.writeByte(opType.id()); + out.writeByte(opType.getId()); out.writeLong(version); out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java index 20587bf0ea..7d567b4bdb 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; @@ -200,17 +201,17 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest, /** * Sets the type of operation to perform. */ - public IndexRequestBuilder setOpType(IndexRequest.OpType opType) { + public IndexRequestBuilder setOpType(DocumentRequest.OpType opType) { request.opType(opType); return this; } /** - * Sets a string representation of the {@link #setOpType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can + * Sets a string representation of the {@link #setOpType(DocumentRequest.OpType)}. Can * be either "index" or "create". */ public IndexRequestBuilder setOpType(String opType) { - request.opType(IndexRequest.OpType.fromString(opType)); + request.opType(opType); return this; } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index cf7d2cf1e5..779b80e1e6 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -132,7 +133,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio return Integer.MAX_VALUE; } - static final class BulkRequestModifier implements Iterator<ActionRequest<?>> { + static final class BulkRequestModifier implements Iterator<DocumentRequest<?>> { final BulkRequest bulkRequest; final Set<Integer> failedSlots; @@ -148,7 +149,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } @Override - public ActionRequest next() { + public DocumentRequest<?> next() { return bulkRequest.requests().get(++currentSlot); } @@ -169,7 +170,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio int slot = 0; originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()]; for (int i = 0; i < bulkRequest.requests().size(); i++) { - ActionRequest request = bulkRequest.requests().get(i); + DocumentRequest<?> request = bulkRequest.requests().get(i); if (failedSlots.contains(i) == false) { modifiedBulkRequest.add(request); originalSlots[slot++] = i; @@ -205,7 +206,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio // 3) Continue with the next request in the bulk. failedSlots.add(currentSlot); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); - itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType().lowercase(), failure)); + itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); } } diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java index 3f33b2e390..1dd9b0c7d7 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.termvectors; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.RealtimeRequest; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.get.MultiGetRequest; @@ -56,7 +55,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; * Note, the {@link #index()}, {@link #type(String)} and {@link #id(String)} are * required. */ -public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> implements DocumentRequest<TermVectorsRequest>, RealtimeRequest { +public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> implements RealtimeRequest { private String type; @@ -200,7 +199,6 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i /** * Returns the type of document to get the term vector for. */ - @Override public String type() { return type; } @@ -208,7 +206,6 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i /** * Returns the id of document the term vector is requested for. */ - @Override public String id() { return id; } @@ -250,18 +247,15 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i /** * @return The routing for this request. */ - @Override public String routing() { return routing; } - @Override public TermVectorsRequest routing(String routing) { this.routing = routing; return this; } - @Override public String parent() { return parent; } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 0d919ff089..662d26117b 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -398,32 +398,34 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> return this.retryOnConflict; } - /** - * Sets the version, which will cause the index operation to only be performed if a matching - * version exists and no changes happened on the doc since then. - */ + @Override public UpdateRequest version(long version) { this.version = version; return this; } + @Override public long version() { return this.version; } - /** - * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. - */ + @Override public UpdateRequest versionType(VersionType versionType) { this.versionType = versionType; return this; } + @Override public VersionType versionType() { return this.versionType; } @Override + public OpType opType() { + return OpType.UPDATE; + } + + @Override public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) { this.refreshPolicy = refreshPolicy; return this; diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index e714663653..57eb7afcb5 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; @@ -68,7 +68,7 @@ public class PipelineExecutionService implements ClusterStateListener { }); } - public void executeBulkRequest(Iterable<ActionRequest<?>> actionRequests, + public void executeBulkRequest(Iterable<DocumentRequest<?>> actionRequests, BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler) { threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() { @@ -80,7 +80,7 @@ public class PipelineExecutionService implements ClusterStateListener { @Override protected void doRun() throws Exception { - for (ActionRequest actionRequest : actionRequests) { + for (DocumentRequest<?> actionRequest : actionRequests) { if ((actionRequest instanceof IndexRequest)) { IndexRequest indexRequest = (IndexRequest) actionRequest; if (Strings.hasText(indexRequest.getPipeline())) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index 6c9723b5b9..f28a98f488 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -86,7 +86,7 @@ public class RestIndexAction extends BaseRestHandler { String sOpType = request.param("op_type"); if (sOpType != null) { try { - indexRequest.opType(IndexRequest.OpType.fromString(sOpType)); + indexRequest.opType(sOpType); } catch (IllegalArgumentException eia){ try { XContentBuilder builder = channel.newErrorBuilder(); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 142fb282c2..c88055f8dd 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -20,8 +20,8 @@ package org.elasticsearch.action.bulk; import org.apache.lucene.util.Constants; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -111,7 +111,7 @@ public class BulkRequestTests extends ESTestCase { public void testBulkAddIterable() { BulkRequest bulkRequest = Requests.bulkRequest(); - List<ActionRequest<?>> requests = new ArrayList<>(); + List<DocumentRequest<?>> requests = new ArrayList<>(); requests.add(new IndexRequest("test", "test", "id").source("field", "value")); requests.add(new UpdateRequest("test", "test", "id").doc("field", "value")); requests.add(new DeleteRequest("test", "test", "id")); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 16502ff92b..4c24e76c13 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.concurrent.CyclicBarrier; import java.util.function.Function; +import static org.elasticsearch.action.DocumentRequest.OpType; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.script.ScriptService.ScriptType; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -319,7 +320,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat(response.getItems()[i].getVersion(), equalTo(1L)); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); assertThat(response.getItems()[i].getResponse().getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getResponse().getVersion(), equalTo(1L)); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue(), equalTo(1)); @@ -357,7 +358,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat(response.getItems()[i].getVersion(), equalTo(2L)); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); assertThat(response.getItems()[i].getResponse().getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getResponse().getVersion(), equalTo(2L)); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue(), equalTo(2)); @@ -381,7 +382,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat(response.getItems()[i].getVersion(), equalTo(3L)); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); } } @@ -398,7 +399,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); } builder = client().prepareBulk(); @@ -414,7 +415,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); for (int j = 0; j < 5; j++) { GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute() .actionGet(); @@ -755,12 +756,12 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertNoFailures(indexBulkItemResponse); assertThat(bulkItemResponse.getItems().length, is(6)); - assertThat(bulkItemResponse.getItems()[0].getOpType(), is("index")); - assertThat(bulkItemResponse.getItems()[1].getOpType(), is("index")); - assertThat(bulkItemResponse.getItems()[2].getOpType(), is("update")); - assertThat(bulkItemResponse.getItems()[3].getOpType(), is("update")); - assertThat(bulkItemResponse.getItems()[4].getOpType(), is("delete")); - assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete")); + assertThat(bulkItemResponse.getItems()[0].getOpType(), is(OpType.INDEX)); + assertThat(bulkItemResponse.getItems()[1].getOpType(), is(OpType.INDEX)); + assertThat(bulkItemResponse.getItems()[2].getOpType(), is(OpType.UPDATE)); + assertThat(bulkItemResponse.getItems()[3].getOpType(), is(OpType.UPDATE)); + assertThat(bulkItemResponse.getItems()[4].getOpType(), is(OpType.DELETE)); + assertThat(bulkItemResponse.getItems()[5].getOpType(), is(OpType.DELETE)); } private static String indexOrAlias() { @@ -805,9 +806,9 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat(bulkResponse.hasFailures(), is(true)); BulkItemResponse[] responseItems = bulkResponse.getItems(); assertThat(responseItems.length, is(3)); - assertThat(responseItems[0].getOpType(), is("index")); - assertThat(responseItems[1].getOpType(), is("update")); - assertThat(responseItems[2].getOpType(), is("delete")); + assertThat(responseItems[0].getOpType(), is(OpType.INDEX)); + assertThat(responseItems[1].getOpType(), is(OpType.UPDATE)); + assertThat(responseItems[2].getOpType(), is(OpType.DELETE)); } // issue 9821 @@ -817,9 +818,9 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { .add(client().prepareUpdate().setIndex("INVALID.NAME").setType("type1").setId("1").setDoc("field", randomInt())) .add(client().prepareDelete().setIndex("INVALID.NAME").setType("type1").setId("1")).get(); assertThat(bulkResponse.getItems().length, is(3)); - assertThat(bulkResponse.getItems()[0].getOpType(), is("index")); - assertThat(bulkResponse.getItems()[1].getOpType(), is("update")); - assertThat(bulkResponse.getItems()[2].getOpType(), is("delete")); + assertThat(bulkResponse.getItems()[0].getOpType(), is(OpType.INDEX)); + assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE)); + assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE)); } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index 4fa640b3ad..72bdc8a58f 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -20,7 +20,12 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.DocumentRequest.OpType; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.unit.TimeValue; @@ -212,11 +217,11 @@ public class RetryTests extends ESTestCase { } private BulkItemResponse successfulResponse() { - return new BulkItemResponse(1, "update", new DeleteResponse()); + return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse()); } private BulkItemResponse failedResponse() { - return new BulkItemResponse(1, "update", new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full"))); + return new BulkItemResponse(1, OpType.INDEX, new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full"))); } } } diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index a8699dd3ea..e6fcad5443 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; @@ -43,18 +44,24 @@ public class IndexRequestTests extends ESTestCase { String createUpper = "CREATE"; String indexUpper = "INDEX"; - assertThat(IndexRequest.OpType.fromString(create), equalTo(IndexRequest.OpType.CREATE)); - assertThat(IndexRequest.OpType.fromString(index), equalTo(IndexRequest.OpType.INDEX)); - assertThat(IndexRequest.OpType.fromString(createUpper), equalTo(IndexRequest.OpType.CREATE)); - assertThat(IndexRequest.OpType.fromString(indexUpper), equalTo(IndexRequest.OpType.INDEX)); + IndexRequest indexRequest = new IndexRequest(""); + indexRequest.opType(create); + assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.CREATE)); + indexRequest.opType(createUpper); + assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.CREATE)); + indexRequest.opType(index); + assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.INDEX)); + indexRequest.opType(indexUpper); + assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.INDEX)); } public void testReadBogusString() { try { - IndexRequest.OpType.fromString("foobar"); + IndexRequest indexRequest = new IndexRequest(""); + indexRequest.opType("foobar"); fail("Expected IllegalArgumentException"); } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("opType [foobar] not allowed")); + assertThat(e.getMessage(), equalTo("opType must be 'create' or 'index', found: [foobar]")); } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java index 9ee5036131..8dac5853ca 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.ingest; */ import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -116,10 +116,10 @@ public class BulkRequestModifierTests extends ESTestCase { }); List<BulkItemResponse> originalResponses = new ArrayList<>(); - for (ActionRequest actionRequest : bulkRequest.requests()) { + for (DocumentRequest actionRequest : bulkRequest.requests()) { IndexRequest indexRequest = (IndexRequest) actionRequest; IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(), indexRequest.id(), 1, true); - originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse)); + originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType(), indexResponse)); } bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0)); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index b04533fafc..331a956e8a 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; @@ -174,7 +174,7 @@ public class IngestActionFilterTests extends ESTestCase { int numRequest = scaledRandomIntBetween(8, 64); for (int i = 0; i < numRequest; i++) { if (rarely()) { - ActionRequest request; + DocumentRequest request; if (randomBoolean()) { request = new DeleteRequest("_index", "_type", "_id"); } else { @@ -196,7 +196,7 @@ public class IngestActionFilterTests extends ESTestCase { verifyZeroInteractions(actionListener); int assertedRequests = 0; - for (ActionRequest actionRequest : bulkRequest.requests()) { + for (DocumentRequest actionRequest : bulkRequest.requests()) { if (actionRequest instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) actionRequest; assertThat(indexRequest.sourceAsMap().size(), equalTo(2)); diff --git a/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java b/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java index 065128af91..abc07da0b3 100644 --- a/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java +++ b/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; +import static org.elasticsearch.action.DocumentRequest.OpType; import static org.elasticsearch.client.Requests.clearIndicesCacheRequest; import static org.elasticsearch.client.Requests.getRequest; import static org.elasticsearch.client.Requests.indexRequest; @@ -190,31 +191,31 @@ public class DocumentActionsIT extends ESIntegTestCase { assertThat(bulkResponse.getItems().length, equalTo(5)); assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[0].getOpType(), equalTo("index")); + assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX)); assertThat(bulkResponse.getItems()[0].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[0].getType(), equalTo("type1")); assertThat(bulkResponse.getItems()[0].getId(), equalTo("1")); assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[1].getOpType(), equalTo("create")); + assertThat(bulkResponse.getItems()[1].getOpType(), equalTo(OpType.CREATE)); assertThat(bulkResponse.getItems()[1].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[1].getType(), equalTo("type1")); assertThat(bulkResponse.getItems()[1].getId(), equalTo("2")); assertThat(bulkResponse.getItems()[2].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[2].getOpType(), equalTo("index")); + assertThat(bulkResponse.getItems()[2].getOpType(), equalTo(OpType.INDEX)); assertThat(bulkResponse.getItems()[2].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[2].getType(), equalTo("type1")); String generatedId3 = bulkResponse.getItems()[2].getId(); assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[3].getOpType(), equalTo("delete")); + assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE)); assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1")); assertThat(bulkResponse.getItems()[3].getId(), equalTo("1")); assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true)); - assertThat(bulkResponse.getItems()[4].getOpType(), equalTo("index")); + assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX)); assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1")); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 53964132ab..4cad7e5ab6 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -314,7 +314,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { int numRequest = scaledRandomIntBetween(8, 64); int numIndexRequests = 0; for (int i = 0; i < numRequest; i++) { - ActionRequest request; + DocumentRequest request; if (randomBoolean()) { if (randomBoolean()) { request = new DeleteRequest("_index", "_type", "_id"); diff --git a/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java b/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java index d8cf1e7b5e..5980f781e2 100644 --- a/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java +++ b/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java @@ -20,6 +20,8 @@ package org.elasticsearch.routing; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -259,7 +261,7 @@ public class SimpleRoutingIT extends ESIntegTestCase { for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.isFailed(), equalTo(true)); - assertThat(bulkItemResponse.getOpType(), equalTo("index")); + assertThat(bulkItemResponse.getOpType(), equalTo(DocumentRequest.OpType.INDEX)); assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); @@ -280,7 +282,7 @@ public class SimpleRoutingIT extends ESIntegTestCase { for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.isFailed(), equalTo(true)); - assertThat(bulkItemResponse.getOpType(), equalTo("update")); + assertThat(bulkItemResponse.getOpType(), equalTo(DocumentRequest.OpType.UPDATE)); assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); @@ -301,7 +303,7 @@ public class SimpleRoutingIT extends ESIntegTestCase { for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.isFailed(), equalTo(true)); - assertThat(bulkItemResponse.getOpType(), equalTo("delete")); + assertThat(bulkItemResponse.getOpType(), equalTo(DocumentRequest.OpType.DELETE)); assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); diff --git a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java index 67e7d528e5..e43991efcc 100644 --- a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java +++ b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java @@ -21,6 +21,7 @@ package org.elasticsearch.versioning; import org.apache.lucene.util.TestUtil; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -724,7 +725,7 @@ public class SimpleVersioningIT extends ESIntegTestCase { client() .prepareIndex("test", "type", "id") .setSource("foo", "bar") - .setOpType(IndexRequest.OpType.INDEX) + .setOpType(DocumentRequest.OpType.INDEX) .setVersion(10) .setVersionType(VersionType.EXTERNAL) .execute() @@ -793,7 +794,7 @@ public class SimpleVersioningIT extends ESIntegTestCase { client() .prepareIndex("test", "type", "id") .setSource("foo", "bar") - .setOpType(IndexRequest.OpType.INDEX) + .setOpType(DocumentRequest.OpType.INDEX) .setVersion(10) .setVersionType(VersionType.EXTERNAL) .execute() |