diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java | 224 |
1 files changed, 60 insertions, 164 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 25c8635a35..140cbb28c9 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; @@ -43,7 +44,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -51,7 +51,6 @@ import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -67,7 +66,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Map; import java.util.function.LongSupplier; @@ -475,20 +473,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ switch (replicaItemExecutionMode(item, i)) { case NORMAL: final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); - switch (docWriteRequest.opType()) { - case CREATE: - case INDEX: - operationResult = - executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, primaryTerm, replica); - break; - case DELETE: - operationResult = - executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, primaryTerm, replica); - break; - default: - throw new IllegalStateException("Unexpected request operation type on replica: " - + docWriteRequest.opType().getLowercase()); - } + operationResult = performOpOnReplica(primaryResponse, docWriteRequest, primaryTerm, replica); assert operationResult != null : "operation result must never be null when primary response has no failure"; location = syncOperationResultOrThrow(operationResult, location); break; @@ -497,12 +482,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ case FAILURE: final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; - operationResult = executeFailureNoOpOnReplica(failure, primaryTerm, replica); + operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), primaryTerm, failure.getMessage()); assert operationResult != null : "operation result must never be null when primary response has no failure"; location = syncOperationResultOrThrow(operationResult, location); break; default: - throw new IllegalStateException("illegal replica item execution mode for: " + item.request()); + throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest); } } catch (Exception e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure @@ -515,6 +500,37 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ return location; } + private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest docWriteRequest, + long primaryTerm, IndexShard replica) throws Exception { + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + final IndexRequest indexRequest = (IndexRequest) docWriteRequest; + final ShardId shardId = replica.shardId(); + final SourceToParse sourceToParse = + SourceToParse.source(shardId.getIndexName(), + indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType()) + .routing(indexRequest.routing()).parent(indexRequest.parent()); + return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(), + indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(), + indexRequest.isRetry(), sourceToParse, update -> { + throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(), + "Mappings are not available on the replica yet, triggered update: " + update); + }); + case DELETE: + DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest; + return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(), + deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery(), + update -> { + throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(), + "Mappings are not available on the replica yet, triggered update: " + update); + }); + default: + throw new IllegalStateException("Unexpected request operation type on replica: " + + docWriteRequest.opType().getLowercase()); + } + } + /** Syncs operation result to the translog or throws a shard not available failure */ private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult, final Translog.Location currentLocation) throws Exception { @@ -547,163 +563,44 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ return next; } - /** - * Execute the given {@link IndexRequest} on a replica shard, throwing a - * {@link RetryOnReplicaException} if the operation needs to be re-tried. - */ - private static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request, - long primaryTerm, IndexShard replica) throws IOException { - - final Engine.Index operation; - try { - operation = prepareIndexOperationOnReplica(primaryResponse, request, primaryTerm, replica); - } catch (MapperParsingException e) { - return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo()); - } - - Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - final ShardId shardId = replica.shardId(); - throw new RetryOnReplicaException(shardId, - "Mappings are not available on the replica yet, triggered update: " + update); - } - return replica.index(operation); - } - - /** Utility method to prepare an index operation on replica shards */ - static Engine.Index prepareIndexOperationOnReplica( - DocWriteResponse primaryResponse, - IndexRequest request, - long primaryTerm, - IndexShard replica) { - - final ShardId shardId = replica.shardId(); - final long version = primaryResponse.getVersion(); - final long seqNo = primaryResponse.getSeqNo(); - final SourceToParse sourceToParse = - SourceToParse.source(shardId.getIndexName(), - request.type(), request.id(), request.source(), request.getContentType()) - .routing(request.routing()).parent(request.parent()); - final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery(); - assert versionType.validateVersionForWrites(version); - - return replica.prepareIndexOnReplica(sourceToParse, seqNo, primaryTerm, version, versionType, - request.getAutoGeneratedTimestamp(), request.isRetry()); - } - - /** Utility method to prepare an index operation on primary shards */ - private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { - final SourceToParse sourceToParse = - SourceToParse.source(request.index(), request.type(), - request.id(), request.source(), request.getContentType()) - .routing(request.routing()).parent(request.parent()); - return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), - request.getAutoGeneratedTimestamp(), request.isRetry()); - } - /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, MappingUpdatePerformer mappingUpdater) throws Exception { - // Update the mappings if parsing the documents includes new dynamic updates - final Engine.Index preUpdateOperation; - final Mapping mappingUpdate; - final boolean mappingUpdateNeeded; + final SourceToParse sourceToParse = + SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType()) + .routing(request.routing()).parent(request.parent()); try { - preUpdateOperation = prepareIndexOperationOnPrimary(request, primary); - mappingUpdate = preUpdateOperation.parsedDoc().dynamicMappingsUpdate(); - mappingUpdateNeeded = mappingUpdate != null; - if (mappingUpdateNeeded) { - mappingUpdater.updateMappings(mappingUpdate, primary.shardId(), request.type()); - } - } catch (MapperParsingException | IllegalArgumentException failure) { - return new Engine.IndexResult(failure, request.version()); + // if a mapping update is required to index this request, issue a mapping update on the master, and abort the + // current indexing operation so that it can be retried with the updated mapping from the master + // The early abort uses the RetryOnPrimaryException, but any other exception would be fine as well. + return primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, + request.getAutoGeneratedTimestamp(), request.isRetry(), update -> { + mappingUpdater.updateMappings(update, primary.shardId(), sourceToParse.type()); + throw new ReplicationOperation.RetryOnPrimaryException(primary.shardId(), "Mapping updated"); + }); + } catch (ReplicationOperation.RetryOnPrimaryException e) { + return primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, + request.getAutoGeneratedTimestamp(), request.isRetry(), update -> mappingUpdater.verifyMappings(update, primary.shardId())); } - - // Verify that there are no more mappings that need to be applied. If there are failures, a - // ReplicationOperation.RetryOnPrimaryException is thrown. - final Engine.Index operation; - if (mappingUpdateNeeded) { - try { - operation = prepareIndexOperationOnPrimary(request, primary); - mappingUpdater.verifyMappings(operation.parsedDoc().dynamicMappingsUpdate(), primary.shardId()); - } catch (MapperParsingException | IllegalStateException e) { - // there was an error in parsing the document that was not because - // of pending mapping updates, so return a failure for the result - return new Engine.IndexResult(e, request.version()); - } - } else { - // There was no mapping update, the operation is the same as the pre-update version. - operation = preUpdateOperation; - } - - return primary.index(operation); } private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary, - final MappingUpdatePerformer mappingUpdater) throws Exception { - boolean mappingUpdateNeeded = false; - if (primary.indexSettings().isSingleType()) { - // When there is a single type, the unique identifier is only composed of the _id, - // so there is no way to differenciate foo#1 from bar#1. This is especially an issue - // if a user first deletes foo#1 and then indexes bar#1: since we do not encode the - // _type in the uid it might look like we are reindexing the same document, which - // would fail if bar#1 is indexed with a lower version than foo#1 was deleted with. - // In order to work around this issue, we make deletions create types. This way, we - // fail if index and delete operations do not use the same type. - try { - Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping(); - if (update != null) { - mappingUpdateNeeded = true; + MappingUpdatePerformer mappingUpdater) throws Exception { + try { + return primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(), + update -> { mappingUpdater.updateMappings(update, primary.shardId(), request.type()); - } - } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.DeleteResult(e, request.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO, false); - } + throw new ReplicationOperation.RetryOnPrimaryException(primary.shardId(), "Mapping updated"); + }); + } catch (ReplicationOperation.RetryOnPrimaryException e) { + return primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(), + update -> mappingUpdater.verifyMappings(update, primary.shardId())); } - if (mappingUpdateNeeded) { - Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping(); - mappingUpdater.verifyMappings(update, primary.shardId()); - } - final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); - return primary.delete(delete); - } - - private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request, - final long primaryTerm, IndexShard replica) throws Exception { - if (replica.indexSettings().isSingleType()) { - // We need to wait for the replica to have the mappings - Mapping update; - try { - update = replica.mapperService().documentMapperWithAutoCreate(request.type()).getMapping(); - } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.DeleteResult(e, request.version(), primaryResponse.getSeqNo(), false); - } - if (update != null) { - final ShardId shardId = replica.shardId(); - throw new RetryOnReplicaException(shardId, - "Mappings are not available on the replica yet, triggered update: " + update); - } - } - - final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery(); - final long version = primaryResponse.getVersion(); - assert versionType.validateVersionForWrites(version); - final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), - primaryResponse.getSeqNo(), primaryTerm, version, versionType); - return replica.delete(delete); - } - - private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, long primaryTerm, - IndexShard replica) throws IOException { - final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOpOnReplica( - primaryFailure.getSeqNo(), primaryTerm, primaryFailure.getMessage()); - return replica.markSeqNoAsNoOp(noOp); } class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { - public void updateMappings(final Mapping update, final ShardId shardId, - final String type) throws Exception { + public void updateMappings(final Mapping update, final ShardId shardId, final String type) { if (update != null) { // can throw timeout exception when updating mappings or ISE for attempting to // update default mappings which are bubbled up @@ -711,8 +608,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ } } - public void verifyMappings(Mapping update, - final ShardId shardId) throws Exception { + public void verifyMappings(final Mapping update, final ShardId shardId) { if (update != null) { throw new ReplicationOperation.RetryOnPrimaryException(shardId, "Dynamic mappings are not available on the node that holds the primary yet"); |