summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java224
1 files changed, 60 insertions, 164 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 25c8635a35..140cbb28c9 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -33,6 +33,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
+import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
@@ -43,7 +44,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@@ -51,7 +51,6 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
@@ -67,7 +66,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
-import java.io.IOException;
import java.util.Map;
import java.util.function.LongSupplier;
@@ -475,20 +473,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
- switch (docWriteRequest.opType()) {
- case CREATE:
- case INDEX:
- operationResult =
- executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, primaryTerm, replica);
- break;
- case DELETE:
- operationResult =
- executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, primaryTerm, replica);
- break;
- default:
- throw new IllegalStateException("Unexpected request operation type on replica: "
- + docWriteRequest.opType().getLowercase());
- }
+ operationResult = performOpOnReplica(primaryResponse, docWriteRequest, primaryTerm, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
@@ -497,12 +482,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
- operationResult = executeFailureNoOpOnReplica(failure, primaryTerm, replica);
+ operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), primaryTerm, failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
default:
- throw new IllegalStateException("illegal replica item execution mode for: " + item.request());
+ throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
@@ -515,6 +500,37 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return location;
}
+ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest docWriteRequest,
+ long primaryTerm, IndexShard replica) throws Exception {
+ switch (docWriteRequest.opType()) {
+ case CREATE:
+ case INDEX:
+ final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
+ final ShardId shardId = replica.shardId();
+ final SourceToParse sourceToParse =
+ SourceToParse.source(shardId.getIndexName(),
+ indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
+ .routing(indexRequest.routing()).parent(indexRequest.parent());
+ return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
+ indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
+ indexRequest.isRetry(), sourceToParse, update -> {
+ throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
+ "Mappings are not available on the replica yet, triggered update: " + update);
+ });
+ case DELETE:
+ DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
+ return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
+ deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery(),
+ update -> {
+ throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
+ "Mappings are not available on the replica yet, triggered update: " + update);
+ });
+ default:
+ throw new IllegalStateException("Unexpected request operation type on replica: "
+ + docWriteRequest.opType().getLowercase());
+ }
+ }
+
/** Syncs operation result to the translog or throws a shard not available failure */
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
@@ -547,163 +563,44 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return next;
}
- /**
- * Execute the given {@link IndexRequest} on a replica shard, throwing a
- * {@link RetryOnReplicaException} if the operation needs to be re-tried.
- */
- private static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request,
- long primaryTerm, IndexShard replica) throws IOException {
-
- final Engine.Index operation;
- try {
- operation = prepareIndexOperationOnReplica(primaryResponse, request, primaryTerm, replica);
- } catch (MapperParsingException e) {
- return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
- }
-
- Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
- if (update != null) {
- final ShardId shardId = replica.shardId();
- throw new RetryOnReplicaException(shardId,
- "Mappings are not available on the replica yet, triggered update: " + update);
- }
- return replica.index(operation);
- }
-
- /** Utility method to prepare an index operation on replica shards */
- static Engine.Index prepareIndexOperationOnReplica(
- DocWriteResponse primaryResponse,
- IndexRequest request,
- long primaryTerm,
- IndexShard replica) {
-
- final ShardId shardId = replica.shardId();
- final long version = primaryResponse.getVersion();
- final long seqNo = primaryResponse.getSeqNo();
- final SourceToParse sourceToParse =
- SourceToParse.source(shardId.getIndexName(),
- request.type(), request.id(), request.source(), request.getContentType())
- .routing(request.routing()).parent(request.parent());
- final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
- assert versionType.validateVersionForWrites(version);
-
- return replica.prepareIndexOnReplica(sourceToParse, seqNo, primaryTerm, version, versionType,
- request.getAutoGeneratedTimestamp(), request.isRetry());
- }
-
- /** Utility method to prepare an index operation on primary shards */
- private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
- final SourceToParse sourceToParse =
- SourceToParse.source(request.index(), request.type(),
- request.id(), request.source(), request.getContentType())
- .routing(request.routing()).parent(request.parent());
- return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(),
- request.getAutoGeneratedTimestamp(), request.isRetry());
- }
-
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
- // Update the mappings if parsing the documents includes new dynamic updates
- final Engine.Index preUpdateOperation;
- final Mapping mappingUpdate;
- final boolean mappingUpdateNeeded;
+ final SourceToParse sourceToParse =
+ SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType())
+ .routing(request.routing()).parent(request.parent());
try {
- preUpdateOperation = prepareIndexOperationOnPrimary(request, primary);
- mappingUpdate = preUpdateOperation.parsedDoc().dynamicMappingsUpdate();
- mappingUpdateNeeded = mappingUpdate != null;
- if (mappingUpdateNeeded) {
- mappingUpdater.updateMappings(mappingUpdate, primary.shardId(), request.type());
- }
- } catch (MapperParsingException | IllegalArgumentException failure) {
- return new Engine.IndexResult(failure, request.version());
+ // if a mapping update is required to index this request, issue a mapping update on the master, and abort the
+ // current indexing operation so that it can be retried with the updated mapping from the master
+ // The early abort uses the RetryOnPrimaryException, but any other exception would be fine as well.
+ return primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
+ request.getAutoGeneratedTimestamp(), request.isRetry(), update -> {
+ mappingUpdater.updateMappings(update, primary.shardId(), sourceToParse.type());
+ throw new ReplicationOperation.RetryOnPrimaryException(primary.shardId(), "Mapping updated");
+ });
+ } catch (ReplicationOperation.RetryOnPrimaryException e) {
+ return primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
+ request.getAutoGeneratedTimestamp(), request.isRetry(), update -> mappingUpdater.verifyMappings(update, primary.shardId()));
}
-
- // Verify that there are no more mappings that need to be applied. If there are failures, a
- // ReplicationOperation.RetryOnPrimaryException is thrown.
- final Engine.Index operation;
- if (mappingUpdateNeeded) {
- try {
- operation = prepareIndexOperationOnPrimary(request, primary);
- mappingUpdater.verifyMappings(operation.parsedDoc().dynamicMappingsUpdate(), primary.shardId());
- } catch (MapperParsingException | IllegalStateException e) {
- // there was an error in parsing the document that was not because
- // of pending mapping updates, so return a failure for the result
- return new Engine.IndexResult(e, request.version());
- }
- } else {
- // There was no mapping update, the operation is the same as the pre-update version.
- operation = preUpdateOperation;
- }
-
- return primary.index(operation);
}
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
- final MappingUpdatePerformer mappingUpdater) throws Exception {
- boolean mappingUpdateNeeded = false;
- if (primary.indexSettings().isSingleType()) {
- // When there is a single type, the unique identifier is only composed of the _id,
- // so there is no way to differenciate foo#1 from bar#1. This is especially an issue
- // if a user first deletes foo#1 and then indexes bar#1: since we do not encode the
- // _type in the uid it might look like we are reindexing the same document, which
- // would fail if bar#1 is indexed with a lower version than foo#1 was deleted with.
- // In order to work around this issue, we make deletions create types. This way, we
- // fail if index and delete operations do not use the same type.
- try {
- Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
- if (update != null) {
- mappingUpdateNeeded = true;
+ MappingUpdatePerformer mappingUpdater) throws Exception {
+ try {
+ return primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
+ update -> {
mappingUpdater.updateMappings(update, primary.shardId(), request.type());
- }
- } catch (MapperParsingException | IllegalArgumentException e) {
- return new Engine.DeleteResult(e, request.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
- }
+ throw new ReplicationOperation.RetryOnPrimaryException(primary.shardId(), "Mapping updated");
+ });
+ } catch (ReplicationOperation.RetryOnPrimaryException e) {
+ return primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
+ update -> mappingUpdater.verifyMappings(update, primary.shardId()));
}
- if (mappingUpdateNeeded) {
- Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
- mappingUpdater.verifyMappings(update, primary.shardId());
- }
- final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
- return primary.delete(delete);
- }
-
- private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request,
- final long primaryTerm, IndexShard replica) throws Exception {
- if (replica.indexSettings().isSingleType()) {
- // We need to wait for the replica to have the mappings
- Mapping update;
- try {
- update = replica.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
- } catch (MapperParsingException | IllegalArgumentException e) {
- return new Engine.DeleteResult(e, request.version(), primaryResponse.getSeqNo(), false);
- }
- if (update != null) {
- final ShardId shardId = replica.shardId();
- throw new RetryOnReplicaException(shardId,
- "Mappings are not available on the replica yet, triggered update: " + update);
- }
- }
-
- final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
- final long version = primaryResponse.getVersion();
- assert versionType.validateVersionForWrites(version);
- final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
- primaryResponse.getSeqNo(), primaryTerm, version, versionType);
- return replica.delete(delete);
- }
-
- private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, long primaryTerm,
- IndexShard replica) throws IOException {
- final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOpOnReplica(
- primaryFailure.getSeqNo(), primaryTerm, primaryFailure.getMessage());
- return replica.markSeqNoAsNoOp(noOp);
}
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {
- public void updateMappings(final Mapping update, final ShardId shardId,
- final String type) throws Exception {
+ public void updateMappings(final Mapping update, final ShardId shardId, final String type) {
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to
// update default mappings which are bubbled up
@@ -711,8 +608,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
- public void verifyMappings(Mapping update,
- final ShardId shardId) throws Exception {
+ public void verifyMappings(final Mapping update, final ShardId shardId) {
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");