summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-19 20:11:54 +0200
committerGitHub <noreply@github.com>2017-06-19 20:11:54 +0200
commit1a20760d797ddf540e4c3cefeae5e8f194774700 (patch)
treed4532157abcf4c8021238196ad9c21a1c7df6ef7
parentd1be2ecfdb0910f1c365ec3d51958a4434b33eb9 (diff)
Simplify IndexShard indexing and deletion methods (#25249)
Indexing or deleting documents through the IndexShard interface is quite complex and error-prone. It requires multiple calls, e.g. first prepareIndexOnPrimary, then do some checks if mapping updates have occurred, then do the actual indexing using index(...) etc. Currently each consumer of the interface (local recovery, peer recovery, replication) has additional custom checks built around it to deal with mapping updates, some of which are even inconsistent. This commit aims at reducing the complexity by exposing a simpler interface on IndexShard. There are no more prepare*** methods and the mapping complexity is also hidden, but still giving callers a possibility to implement custom logic to deal with mapping updates.
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java4
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java224
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java9
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java214
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java73
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java5
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java23
-rw-r--r--core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java80
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java38
-rw-r--r--core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java11
-rw-r--r--core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java28
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java30
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java129
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java7
-rw-r--r--test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java56
15 files changed, 354 insertions, 577 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java
index 812653d582..7f16b7c4d6 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java
@@ -27,13 +27,13 @@ public interface MappingUpdatePerformer {
/**
* Update the mappings on the master.
*/
- void updateMappings(Mapping update, ShardId shardId, String type) throws Exception;
+ void updateMappings(Mapping update, ShardId shardId, String type);
/**
* Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the operation needs to be
* retried on the primary due to the mappings not being present yet, or a different exception if
* updating the mappings on the master failed.
*/
- void verifyMappings(Mapping update, ShardId shardId) throws Exception;
+ void verifyMappings(Mapping update, ShardId shardId);
}
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 25c8635a35..140cbb28c9 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -33,6 +33,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
+import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
@@ -43,7 +44,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@@ -51,7 +51,6 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
@@ -67,7 +66,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
-import java.io.IOException;
import java.util.Map;
import java.util.function.LongSupplier;
@@ -475,20 +473,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
- switch (docWriteRequest.opType()) {
- case CREATE:
- case INDEX:
- operationResult =
- executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, primaryTerm, replica);
- break;
- case DELETE:
- operationResult =
- executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, primaryTerm, replica);
- break;
- default:
- throw new IllegalStateException("Unexpected request operation type on replica: "
- + docWriteRequest.opType().getLowercase());
- }
+ operationResult = performOpOnReplica(primaryResponse, docWriteRequest, primaryTerm, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
@@ -497,12 +482,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
- operationResult = executeFailureNoOpOnReplica(failure, primaryTerm, replica);
+ operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), primaryTerm, failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
default:
- throw new IllegalStateException("illegal replica item execution mode for: " + item.request());
+ throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
@@ -515,6 +500,37 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return location;
}
+ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest docWriteRequest,
+ long primaryTerm, IndexShard replica) throws Exception {
+ switch (docWriteRequest.opType()) {
+ case CREATE:
+ case INDEX:
+ final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
+ final ShardId shardId = replica.shardId();
+ final SourceToParse sourceToParse =
+ SourceToParse.source(shardId.getIndexName(),
+ indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
+ .routing(indexRequest.routing()).parent(indexRequest.parent());
+ return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
+ indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
+ indexRequest.isRetry(), sourceToParse, update -> {
+ throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
+ "Mappings are not available on the replica yet, triggered update: " + update);
+ });
+ case DELETE:
+ DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
+ return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
+ deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery(),
+ update -> {
+ throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
+ "Mappings are not available on the replica yet, triggered update: " + update);
+ });
+ default:
+ throw new IllegalStateException("Unexpected request operation type on replica: "
+ + docWriteRequest.opType().getLowercase());
+ }
+ }
+
/** Syncs operation result to the translog or throws a shard not available failure */
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
@@ -547,163 +563,44 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return next;
}
- /**
- * Execute the given {@link IndexRequest} on a replica shard, throwing a
- * {@link RetryOnReplicaException} if the operation needs to be re-tried.
- */
- private static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request,
- long primaryTerm, IndexShard replica) throws IOException {
-
- final Engine.Index operation;
- try {
- operation = prepareIndexOperationOnReplica(primaryResponse, request, primaryTerm, replica);
- } catch (MapperParsingException e) {
- return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
- }
-
- Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
- if (update != null) {
- final ShardId shardId = replica.shardId();
- throw new RetryOnReplicaException(shardId,
- "Mappings are not available on the replica yet, triggered update: " + update);
- }
- return replica.index(operation);
- }
-
- /** Utility method to prepare an index operation on replica shards */
- static Engine.Index prepareIndexOperationOnReplica(
- DocWriteResponse primaryResponse,
- IndexRequest request,
- long primaryTerm,
- IndexShard replica) {
-
- final ShardId shardId = replica.shardId();
- final long version = primaryResponse.getVersion();
- final long seqNo = primaryResponse.getSeqNo();
- final SourceToParse sourceToParse =
- SourceToParse.source(shardId.getIndexName(),
- request.type(), request.id(), request.source(), request.getContentType())
- .routing(request.routing()).parent(request.parent());
- final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
- assert versionType.validateVersionForWrites(version);
-
- return replica.prepareIndexOnReplica(sourceToParse, seqNo, primaryTerm, version, versionType,
- request.getAutoGeneratedTimestamp(), request.isRetry());
- }
-
- /** Utility method to prepare an index operation on primary shards */
- private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
- final SourceToParse sourceToParse =
- SourceToParse.source(request.index(), request.type(),
- request.id(), request.source(), request.getContentType())
- .routing(request.routing()).parent(request.parent());
- return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(),
- request.getAutoGeneratedTimestamp(), request.isRetry());
- }
-
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
- // Update the mappings if parsing the documents includes new dynamic updates
- final Engine.Index preUpdateOperation;
- final Mapping mappingUpdate;
- final boolean mappingUpdateNeeded;
+ final SourceToParse sourceToParse =
+ SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType())
+ .routing(request.routing()).parent(request.parent());
try {
- preUpdateOperation = prepareIndexOperationOnPrimary(request, primary);
- mappingUpdate = preUpdateOperation.parsedDoc().dynamicMappingsUpdate();
- mappingUpdateNeeded = mappingUpdate != null;
- if (mappingUpdateNeeded) {
- mappingUpdater.updateMappings(mappingUpdate, primary.shardId(), request.type());
- }
- } catch (MapperParsingException | IllegalArgumentException failure) {
- return new Engine.IndexResult(failure, request.version());
+ // if a mapping update is required to index this request, issue a mapping update on the master, and abort the
+ // current indexing operation so that it can be retried with the updated mapping from the master
+ // The early abort uses the RetryOnPrimaryException, but any other exception would be fine as well.
+ return primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
+ request.getAutoGeneratedTimestamp(), request.isRetry(), update -> {
+ mappingUpdater.updateMappings(update, primary.shardId(), sourceToParse.type());
+ throw new ReplicationOperation.RetryOnPrimaryException(primary.shardId(), "Mapping updated");
+ });
+ } catch (ReplicationOperation.RetryOnPrimaryException e) {
+ return primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
+ request.getAutoGeneratedTimestamp(), request.isRetry(), update -> mappingUpdater.verifyMappings(update, primary.shardId()));
}
-
- // Verify that there are no more mappings that need to be applied. If there are failures, a
- // ReplicationOperation.RetryOnPrimaryException is thrown.
- final Engine.Index operation;
- if (mappingUpdateNeeded) {
- try {
- operation = prepareIndexOperationOnPrimary(request, primary);
- mappingUpdater.verifyMappings(operation.parsedDoc().dynamicMappingsUpdate(), primary.shardId());
- } catch (MapperParsingException | IllegalStateException e) {
- // there was an error in parsing the document that was not because
- // of pending mapping updates, so return a failure for the result
- return new Engine.IndexResult(e, request.version());
- }
- } else {
- // There was no mapping update, the operation is the same as the pre-update version.
- operation = preUpdateOperation;
- }
-
- return primary.index(operation);
}
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
- final MappingUpdatePerformer mappingUpdater) throws Exception {
- boolean mappingUpdateNeeded = false;
- if (primary.indexSettings().isSingleType()) {
- // When there is a single type, the unique identifier is only composed of the _id,
- // so there is no way to differenciate foo#1 from bar#1. This is especially an issue
- // if a user first deletes foo#1 and then indexes bar#1: since we do not encode the
- // _type in the uid it might look like we are reindexing the same document, which
- // would fail if bar#1 is indexed with a lower version than foo#1 was deleted with.
- // In order to work around this issue, we make deletions create types. This way, we
- // fail if index and delete operations do not use the same type.
- try {
- Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
- if (update != null) {
- mappingUpdateNeeded = true;
+ MappingUpdatePerformer mappingUpdater) throws Exception {
+ try {
+ return primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
+ update -> {
mappingUpdater.updateMappings(update, primary.shardId(), request.type());
- }
- } catch (MapperParsingException | IllegalArgumentException e) {
- return new Engine.DeleteResult(e, request.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
- }
+ throw new ReplicationOperation.RetryOnPrimaryException(primary.shardId(), "Mapping updated");
+ });
+ } catch (ReplicationOperation.RetryOnPrimaryException e) {
+ return primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
+ update -> mappingUpdater.verifyMappings(update, primary.shardId()));
}
- if (mappingUpdateNeeded) {
- Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
- mappingUpdater.verifyMappings(update, primary.shardId());
- }
- final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
- return primary.delete(delete);
- }
-
- private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request,
- final long primaryTerm, IndexShard replica) throws Exception {
- if (replica.indexSettings().isSingleType()) {
- // We need to wait for the replica to have the mappings
- Mapping update;
- try {
- update = replica.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
- } catch (MapperParsingException | IllegalArgumentException e) {
- return new Engine.DeleteResult(e, request.version(), primaryResponse.getSeqNo(), false);
- }
- if (update != null) {
- final ShardId shardId = replica.shardId();
- throw new RetryOnReplicaException(shardId,
- "Mappings are not available on the replica yet, triggered update: " + update);
- }
- }
-
- final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
- final long version = primaryResponse.getVersion();
- assert versionType.validateVersionForWrites(version);
- final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
- primaryResponse.getSeqNo(), primaryTerm, version, versionType);
- return replica.delete(delete);
- }
-
- private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, long primaryTerm,
- IndexShard replica) throws IOException {
- final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOpOnReplica(
- primaryFailure.getSeqNo(), primaryTerm, primaryFailure.getMessage());
- return replica.markSeqNoAsNoOp(noOp);
}
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {
- public void updateMappings(final Mapping update, final ShardId shardId,
- final String type) throws Exception {
+ public void updateMappings(final Mapping update, final ShardId shardId, final String type) {
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to
// update default mappings which are bubbled up
@@ -711,8 +608,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
- public void verifyMappings(Mapping update,
- final ShardId shardId) throws Exception {
+ public void verifyMappings(final Mapping update, final ShardId shardId) {
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
diff --git a/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
index 800304a95a..56311455a0 100644
--- a/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
+++ b/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
@@ -19,6 +19,7 @@
package org.elasticsearch.cluster.action.index;
+import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
@@ -34,8 +35,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
-import java.util.concurrent.TimeoutException;
-
/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
@@ -77,7 +76,7 @@ public class MappingUpdatedAction extends AbstractComponent {
* Same as {@link #updateMappingOnMaster(Index, String, Mapping, TimeValue)}
* using the default timeout.
*/
- public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate) throws Exception {
+ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate) {
updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout);
}
@@ -86,9 +85,9 @@ public class MappingUpdatedAction extends AbstractComponent {
* {@code timeout}. When this method returns successfully mappings have
* been applied to the master node and propagated to data nodes.
*/
- public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) throws Exception {
+ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
- throw new TimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
+ throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 18f025c27c..6e04907d9e 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -33,6 +33,7 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@@ -57,6 +58,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
+import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexNotFoundException;
@@ -85,7 +87,9 @@ import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.IdFieldMapper;
+import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
@@ -109,6 +113,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
@@ -144,6 +149,8 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.elasticsearch.index.mapper.SourceToParse.source;
+
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
private final ThreadPool threadPool;
@@ -167,7 +174,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;
- private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter;
/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
@@ -260,7 +266,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
- this.translogOpToEngineOpConverter = new TranslogOpToEngineOpConverter(shardId, mapperService);
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
@@ -531,34 +536,47 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return previousState;
}
- public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
- boolean isRetry) {
- try {
- verifyPrimary();
- return prepareIndex(docMapper(source.type()), source, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType,
- Engine.Operation.Origin.PRIMARY, autoGeneratedIdTimestamp, isRetry);
- } catch (Exception e) {
- verifyNotClosed(e);
- throw e;
- }
+ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
+ long autoGeneratedTimestamp, boolean isRetry,
+ Consumer<Mapping> onMappingUpdate) throws IOException {
+ return applyIndexOperation(SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
+ isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse, onMappingUpdate);
+ }
+
+ public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long opPrimaryTerm, long version, VersionType versionType,
+ long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse,
+ Consumer<Mapping> onMappingUpdate) throws IOException {
+ return applyIndexOperation(seqNo, opPrimaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry,
+ Engine.Operation.Origin.REPLICA, sourceToParse, onMappingUpdate);
}
- public Engine.Index prepareIndexOnReplica(SourceToParse source, long opSeqNo, long opPrimaryTerm, long version, VersionType versionType,
- long autoGeneratedIdTimestamp, boolean isRetry) {
+ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType,
+ long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
+ SourceToParse sourceToParse, Consumer<Mapping> onMappingUpdate) throws IOException {
+ assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
+ assert versionType.validateVersionForWrites(version);
+ ensureWriteAllowed(origin);
+ Engine.Index operation;
try {
- verifyReplicationTarget();
- assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
- return prepareIndex(docMapper(source.type()), source, opSeqNo, opPrimaryTerm, version, versionType,
- Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, isRetry);
+ operation = prepareIndex(docMapper(sourceToParse.type()), sourceToParse, seqNo, opPrimaryTerm, version, versionType, origin,
+ autoGeneratedTimeStamp, isRetry);
+ Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
+ if (update != null) {
+ // wrap this in the outer catch block, as the master might also throw a MapperParsingException when updating the mapping
+ onMappingUpdate.accept(update);
+ }
+ } catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) {
+ return new Engine.IndexResult(e, version, seqNo);
} catch (Exception e) {
verifyNotClosed(e);
throw e;
}
+
+ return index(getEngine(), operation);
}
- static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long seqNo, long primaryTerm, long version,
- VersionType versionType, Engine.Operation.Origin origin, long autoGeneratedIdTimestamp,
- boolean isRetry) {
+ public static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long seqNo, long primaryTerm, long version,
+ VersionType versionType, Engine.Operation.Origin origin, long autoGeneratedIdTimestamp, boolean isRetry) {
long startTime = System.nanoTime();
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
if (docMapper.getMapping() != null) {
@@ -573,43 +591,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}
- /**
- * Applies an engine operation to the shard, which can be either an index, delete or noop operation.
- */
- public Engine.Result applyOperation(Engine.Operation operation) throws IOException {
- return applyOperation(getEngine(), operation);
- }
-
- private Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException {
- switch (operation.operationType()) {
- case INDEX:
- Engine.Index engineIndex = (Engine.Index) operation;
- return index(engine, engineIndex);
- case DELETE:
- final Engine.Delete engineDelete = (Engine.Delete) operation;
- return delete(engine, engineDelete);
- case NO_OP:
- final Engine.NoOp engineNoOp = (Engine.NoOp) operation;
- return noOp(engine, engineNoOp);
- default:
- throw new IllegalStateException("No operation defined for [" + operation + "]");
- }
- }
-
- private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
- active.set(true);
- if (logger.isTraceEnabled()) {
- logger.trace("noop (seq# [{}])", noOp.seqNo());
- }
- return engine.noOp(noOp);
- }
-
- public Engine.IndexResult index(Engine.Index index) throws IOException {
- ensureWriteAllowed(index);
- Engine engine = getEngine();
- return index(engine, index);
- }
-
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
active.set(true);
final Engine.IndexResult result;
@@ -628,32 +609,66 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return result;
}
- public Engine.NoOp prepareMarkingSeqNoAsNoOpOnReplica(long seqNo, long opPrimaryTerm, String reason) {
- verifyReplicationTarget();
+ public Engine.NoOpResult markSeqNoAsNoop(long seqNo, long primaryTerm, String reason) throws IOException {
+ return markSeqNoAsNoop(seqNo, primaryTerm, reason, Engine.Operation.Origin.REPLICA);
+ }
+
+ private Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason,
+ Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
long startTime = System.nanoTime();
- return new Engine.NoOp(seqNo, opPrimaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
+ ensureWriteAllowed(origin);
+ final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason);
+ return noOp(getEngine(), noOp);
}
- public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException {
- ensureWriteAllowed(noOp);
- Engine engine = getEngine();
+ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
+ active.set(true);
+ if (logger.isTraceEnabled()) {
+ logger.trace("noop (seq# [{}])", noOp.seqNo());
+ }
return engine.noOp(noOp);
}
- public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
- verifyPrimary();
- final Term uid = extractUidForDelete(type, id);
- return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version,
- versionType, Engine.Operation.Origin.PRIMARY);
+ public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType,
+ Consumer<Mapping> onMappingUpdate) throws IOException {
+ return applyDeleteOperation(SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType,
+ Engine.Operation.Origin.PRIMARY, onMappingUpdate);
}
- public Engine.Delete prepareDeleteOnReplica(String type, String id, long opSeqNo, long opPrimaryTerm,
- long version, VersionType versionType) {
- verifyReplicationTarget();
+ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long primaryTerm, long version, String type, String id,
+ VersionType versionType,
+ Consumer<Mapping> onMappingUpdate) throws IOException {
+ return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA, onMappingUpdate);
+ }
+
+ private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
+ VersionType versionType, Engine.Operation.Origin origin,
+ Consumer<Mapping> onMappingUpdate) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
+ assert versionType.validateVersionForWrites(version);
+ ensureWriteAllowed(origin);
+ if (indexSettings().isSingleType()) {
+ // When there is a single type, the unique identifier is only composed of the _id,
+ // so there is no way to differenciate foo#1 from bar#1. This is especially an issue
+ // if a user first deletes foo#1 and then indexes bar#1: since we do not encode the
+ // _type in the uid it might look like we are reindexing the same document, which
+ // would fail if bar#1 is indexed with a lower version than foo#1 was deleted with.
+ // In order to work around this issue, we make deletions create types. This way, we
+ // fail if index and delete operations do not use the same type.
+ try {
+ Mapping update = docMapper(type).getMapping();
+ if (update != null) {
+ onMappingUpdate.accept(update);
+ }
+ } catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) {
+ return new Engine.DeleteResult(e, version, seqNo, false);
+ }
+ }
final Term uid = extractUidForDelete(type, id);
- return prepareDelete(type, id, uid, opSeqNo, opPrimaryTerm, version, versionType, Engine.Operation.Origin.REPLICA);
+ final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version,
+ versionType, origin);
+ return delete(getEngine(), delete);
}
private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
@@ -662,12 +677,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime);
}
- public Engine.DeleteResult delete(Engine.Delete delete) throws IOException {
- ensureWriteAllowed(delete);
- Engine engine = getEngine();
- return delete(engine, delete);
- }
-
private Term extractUidForDelete(String type, String id) {
if (indexSettings.isSingleType()) {
// This is only correct because we create types dynamically on delete operations
@@ -1053,8 +1062,33 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert currentEngineReference.get() == null;
}
- public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
- return translogOpToEngineOpConverter.convertToEngineOp(operation, origin);
+ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin,
+ Consumer<Mapping> onMappingUpdate) throws IOException {
+ final Engine.Result result;
+ switch (operation.opType()) {
+ case INDEX:
+ final Translog.Index index = (Translog.Index) operation;
+ // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
+ // autoGeneratedID docs that are coming from the primary are updated correctly.
+ result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
+ index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin,
+ source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
+ .routing(index.routing()).parent(index.parent()), onMappingUpdate);
+ break;
+ case DELETE:
+ final Translog.Delete delete = (Translog.Delete) operation;
+ result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
+ delete.versionType().versionTypeForReplicationAndRecovery(), origin, onMappingUpdate);
+ break;
+ case NO_OP:
+ final Translog.NoOp noOp = (Translog.NoOp) operation;
+ result = markSeqNoAsNoop(noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);
+ break;
+ default:
+ throw new IllegalStateException("No operation defined for [" + operation + "]");
+ }
+ ExceptionsHelper.reThrowIfNotNull(result.getFailure());
+ return result;
}
// package-private for testing
@@ -1066,12 +1100,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
- Engine.Operation engineOp = convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
- applyOperation(engine, engineOp);
+ applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
+ throw new IllegalArgumentException("unexpected mapping update: " + update);
+ });
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
- } catch (ElasticsearchException e) {
- if (e.status() == RestStatus.BAD_REQUEST) {
+ } catch (Exception e) {
+ if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
logger.info("ignoring recovery of a corrupt translog entry", e);
} else {
@@ -1227,11 +1262,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
- private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
- Engine.Operation.Origin origin = op.origin();
+ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (origin == Engine.Operation.Origin.PRIMARY) {
+ verifyPrimary();
if (writeAllowedStatesForPrimary.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForPrimary + ", origin [" + origin + "]");
}
@@ -1241,6 +1276,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
} else {
assert origin == Engine.Operation.Origin.REPLICA;
+ verifyReplicationTarget();
if (writeAllowedStatesForReplica.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForReplica + ", origin [" + origin + "]");
}
@@ -2048,7 +2084,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
@Override
- protected void doRun() throws Exception {
+ protected void doRun() throws IOException {
flush(new FlushRequest());
}
@@ -2070,7 +2106,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
@Override
- protected void doRun() throws Exception {
+ protected void doRun() throws IOException {
rollTranslogGeneration();
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java
deleted file mode 100644
index 372e8f4e25..0000000000
--- a/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.index.shard;
-
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.mapper.DocumentMapperForType;
-import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.translog.Translog;
-
-import static org.elasticsearch.index.mapper.SourceToParse.source;
-
-/**
- * The TranslogOpToEngineOpConverter encapsulates all the logic needed to transform a translog entry into an
- * indexing operation including source parsing and field creation from the source.
- */
-public class TranslogOpToEngineOpConverter {
- private final MapperService mapperService;
- private final ShardId shardId;
-
- protected TranslogOpToEngineOpConverter(ShardId shardId, MapperService mapperService) {
- this.shardId = shardId;
- this.mapperService = mapperService;
- }
-
- protected DocumentMapperForType docMapper(String type) {
- return mapperService.documentMapperWithAutoCreate(type); // protected for testing
- }
-
- public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
- switch (operation.opType()) {
- case INDEX:
- final Translog.Index index = (Translog.Index) operation;
- // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
- // autoGeneratedID docs that are coming from the primary are updated correctly.
- final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
- source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
- .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
- index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
- index.getAutoGeneratedIdTimestamp(), true);
- return engineIndex;
- case DELETE:
- final Translog.Delete delete = (Translog.Delete) operation;
- final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
- delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
- origin, System.nanoTime());
- return engineDelete;
- case NO_OP:
- final Translog.NoOp noOp = (Translog.NoOp) operation;
- final Engine.NoOp engineNoOp =
- new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
- return engineNoOp;
- default:
- throw new IllegalStateException("No operation defined for [" + operation + "]");
- }
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
index 77d8b4d707..459b811552 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
@@ -467,6 +467,11 @@ public class RecoveryState implements ToXContent, Streamable {
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
}
+ public synchronized void incrementRecoveredOperations(int ops) {
+ recovered += ops;
+ assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
+ }
+
public synchronized void decrementRecoveredOperations(int ops) {
recovered -= ops;
assert recovered >= 0 : "recovered operations must be non-negative. Because [" + recovered + "] after decrementing [" + ops + "]";
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 6a465f1111..6bf63bcd54 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -379,29 +379,20 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
- public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws MapperException, IOException {
+ public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
- // first convert all translog operations to engine operations to check for mapping updates
- List<Engine.Operation> engineOps = operations.stream().map(
- op -> {
- Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY);
- if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) {
- throw new MapperException("mapping updates are not allowed (type: [" + engineOp.type() + "], id: [" +
- ((Engine.Index) engineOp).id() + "])");
- }
- return engineOp;
- }
- ).collect(Collectors.toList());
- // actually apply engine operations
- for (Engine.Operation engineOp : engineOps) {
- indexShard().applyOperation(engineOp);
- translog.incrementRecoveredOperations();
+ for (Translog.Operation operation : operations) {
+ indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
+ throw new MapperException("mapping updates are not allowed [" + operation + "]");
+ });
}
+ // update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
+ translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
return indexShard().getLocalCheckpoint();
}
diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java
index 89496054a1..ec43706744 100644
--- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java
@@ -51,7 +51,6 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;
-import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.HashMap;
@@ -222,13 +221,13 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;
- // Pretend the mappings haven't made it to the node yet, and throw a rejection
- Exception err = new ReplicationOperation.RetryOnPrimaryException(shardId, "rejection");
+ // Pretend the mappings haven't made it to the node yet, and throw a rejection
+ RuntimeException err = new ReplicationOperation.RetryOnPrimaryException(shardId, "rejection");
try {
TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
location, 0, updateHelper, threadPool::absoluteTimeInMillis,
- new ThrowingMappingUpdatePerformer(err));
+ new ThrowingVerifyingMappingUpdatePerformer(err));
fail("should have thrown a retry exception");
} catch (ReplicationOperation.RetryOnPrimaryException e) {
assertThat(e, equalTo(err));
@@ -252,7 +251,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
UpdateHelper updateHelper = null;
// Return a mapping conflict (IAE) when trying to update the mapping
- Exception err = new IllegalArgumentException("mapping conflict");
+ RuntimeException err = new IllegalArgumentException("mapping conflict");
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData,
shard, bulkShardRequest, location, 0, updateHelper,
@@ -537,6 +536,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar")
);
final String failureMessage = "simulated primary failure";
+ final IOException exception = new IOException(failureMessage);
itemRequest.setPrimaryResponse(new BulkItemResponse(0,
randomFrom(
DocWriteRequest.OpType.CREATE,
@@ -544,7 +544,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
DocWriteRequest.OpType.INDEX
),
new BulkItemResponse.Failure("index", "type", "1",
- new IOException(failureMessage), 1L)
+ exception, 1L)
));
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
itemRequests[0] = itemRequest;
@@ -552,12 +552,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
shard.shardId(), RefreshPolicy.NONE, itemRequests);
bulkShardRequest.primaryTerm(randomIntBetween(1, (int) shard.getPrimaryTerm()));
TransportShardBulkAction.performOnReplica(bulkShardRequest, shard);
- ArgumentCaptor<Engine.NoOp> noOp = ArgumentCaptor.forClass(Engine.NoOp.class);
- verify(shard, times(1)).markSeqNoAsNoOp(noOp.capture());
- final Engine.NoOp noOpValue = noOp.getValue();
- assertThat(noOpValue.seqNo(), equalTo(1L));
- assertThat(noOpValue.primaryTerm(), equalTo(bulkShardRequest.primaryTerm()));
- assertThat(noOpValue.reason(), containsString(failureMessage));
+ verify(shard, times(1)).markSeqNoAsNoop(1, bulkShardRequest.primaryTerm(), exception.toString());
closeShards(shard);
}
@@ -574,16 +569,14 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard,
new MappingUpdatePerformer() {
@Override
- public void updateMappings(Mapping update, ShardId shardId,
- String type) throws Exception {
+ public void updateMappings(Mapping update, ShardId shardId, String type) {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
}
@Override
- public void verifyMappings(Mapping update,
- ShardId shardId) throws Exception {
+ public void verifyMappings(Mapping update, ShardId shardId) {
// No-op, will be called
logger.info("--> verifying mappings noop");
verifyCalled.incrementAndGet();
@@ -593,9 +586,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
assertThat("mappings were \"verified\" once", verifyCalled.get(), equalTo(1));
- // Verify that the shard "prepared" the operation twice
- verify(shard, times(2)).prepareIndexOnPrimary(any(), anyLong(), any(),
- anyLong(), anyBoolean());
+ // Verify that the shard "executed" the operation twice
+ verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean(), any());
// Update the mapping, so the next mapping updater doesn't do anything
final MapperService mapperService = shard.mapperService();
@@ -605,22 +597,19 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard,
new MappingUpdatePerformer() {
@Override
- public void updateMappings(Mapping update, ShardId shardId,
- String type) throws Exception {
+ public void updateMappings(Mapping update, ShardId shardId, String type) {
fail("should not have had to update the mappings");
}
@Override
- public void verifyMappings(Mapping update,
- ShardId shardId) throws Exception {
+ public void verifyMappings(Mapping update, ShardId shardId) {
fail("should not have had to update the mappings");
}
});
- // Verify that the shard "prepared" the operation only once (2 for previous invocations plus
+ // Verify that the shard "executed" the operation only once (2 for previous invocations plus
// 1 for this execution)
- verify(shard, times(3)).prepareIndexOnPrimary(any(), anyLong(), any(),
- anyLong(), anyBoolean());
+ verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean(), any());
closeShards(shard);
}
@@ -638,25 +627,6 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
}
}
- public void testPrepareIndexOpOnReplica() throws Exception {
- IndexMetaData metaData = indexMetaData();
- IndexShard shard = newStartedShard(false);
-
- DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 17, 0, 1, randomBoolean());
- IndexRequest request = new IndexRequest("index", "type", "id")
- .source(Requests.INDEX_CONTENT_TYPE, "field", "value");
-
- Engine.Index op = TransportShardBulkAction.prepareIndexOperationOnReplica(
- primaryResponse, request, shard.getPrimaryTerm(), shard);
-
- assertThat(op.version(), equalTo(primaryResponse.getVersion()));
- assertThat(op.seqNo(), equalTo(primaryResponse.getSeqNo()));
- assertThat(op.versionType(), equalTo(VersionType.EXTERNAL));
- assertThat(op.primaryTerm(), equalTo(shard.getPrimaryTerm()));
-
- closeShards(shard);
- }
-
public void testProcessUpdateResponse() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(false);
@@ -870,40 +840,40 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
/** Doesn't perform any mapping updates */
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
- public void updateMappings(Mapping update, ShardId shardId, String type) throws Exception {
+ public void updateMappings(Mapping update, ShardId shardId, String type) {
}
- public void verifyMappings(Mapping update, ShardId shardId) throws Exception {
+ public void verifyMappings(Mapping update, ShardId shardId) {
}
}
/** Always throw the given exception */
private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer {
- private final Exception e;
- ThrowingMappingUpdatePerformer(Exception e) {
+ private final RuntimeException e;
+ ThrowingMappingUpdatePerformer(RuntimeException e) {
this.e = e;
}
- public void updateMappings(Mapping update, ShardId shardId, String type) throws Exception {
+ public void updateMappings(Mapping update, ShardId shardId, String type) {
throw e;
}
- public void verifyMappings(Mapping update, ShardId shardId) throws Exception {
+ public void verifyMappings(Mapping update, ShardId shardId) {
fail("should not have gotten to this point");
}
}
/** Always throw the given exception */
private class ThrowingVerifyingMappingUpdatePerformer implements MappingUpdatePerformer {
- private final Exception e;
- ThrowingVerifyingMappingUpdatePerformer(Exception e) {
+ private final RuntimeException e;
+ ThrowingVerifyingMappingUpdatePerformer(RuntimeException e) {
this.e = e;
}
- public void updateMappings(Mapping update, ShardId shardId, String type) throws Exception {
+ public void updateMappings(Mapping update, ShardId shardId, String type) {
}
- public void verifyMappings(Mapping update, ShardId shardId) throws Exception {
+ public void verifyMappings(Mapping update, ShardId shardId) {
throw e;
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index af18781dfa..6cd9328558 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -89,6 +89,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
@@ -117,9 +118,9 @@ import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
+import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
-import org.elasticsearch.index.shard.TranslogOpToEngineOpConverter;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
@@ -180,6 +181,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
+import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
@@ -2716,8 +2718,7 @@ public class InternalEngineTests extends ESTestCase {
}
}
- public static class TranslogHandler extends TranslogOpToEngineOpConverter
- implements EngineConfig.TranslogRecoveryRunner {
+ public static class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
private final MapperService mapperService;
public Mapping mappingUpdate = null;
@@ -2725,7 +2726,6 @@ public class InternalEngineTests extends ESTestCase {
private final AtomicLong appliedOperations = new AtomicLong();
public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
- super(new ShardId("test", "_na_", 0), null);
NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer());
IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap());
SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap());
@@ -2734,8 +2734,7 @@ public class InternalEngineTests extends ESTestCase {
() -> null);
}
- @Override
- protected DocumentMapperForType docMapper(String type) {
+ private DocumentMapperForType docMapper(String type) {
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type);
DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService);
return new DocumentMapperForType(b.build(mapperService), mappingUpdate);
@@ -2780,6 +2779,33 @@ public class InternalEngineTests extends ESTestCase {
}
return opsRecovered;
}
+
+ private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
+ switch (operation.opType()) {
+ case INDEX:
+ final Translog.Index index = (Translog.Index) operation;
+ final String indexName = mapperService.index().getName();
+ final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
+ source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
+ .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
+ index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
+ index.getAutoGeneratedIdTimestamp(), true);
+ return engineIndex;
+ case DELETE:
+ final Translog.Delete delete = (Translog.Delete) operation;
+ final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
+ delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
+ origin, System.nanoTime());
+ return engineDelete;
+ case NO_OP:
+ final Translog.NoOp noOp = (Translog.NoOp) operation;
+ final Engine.NoOp engineNoOp =
+ new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
+ return engineNoOp;
+ default:
+ throw new IllegalStateException("No operation defined for [" + operation + "]");
+ }
+ }
}
public void testRecoverFromForeignTranslog() throws IOException {
diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java
index 91a498541e..084f5f19bd 100644
--- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java
+++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java
@@ -21,7 +21,9 @@ package org.elasticsearch.index.mapper;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@@ -35,6 +37,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
public class DynamicMappingIT extends ESIntegTestCase {
@@ -144,6 +148,13 @@ public class DynamicMappingIT extends ESIntegTestCase {
assertEquals("type[bar] missing", e1.getMessage());
assertEquals("trying to auto create mapping, but dynamic mapping is disabled", e1.getCause().getMessage());
+ BulkResponse bulkResponse = client().prepareBulk().add(new IndexRequest("index_2", "bar", "2").source("field", "abc")).get();
+ assertTrue(bulkResponse.hasFailures());
+ BulkItemResponse.Failure firstFailure = bulkResponse.getItems()[0].getFailure();
+ assertThat(firstFailure.getCause(), instanceOf(TypeMissingException.class));
+ assertEquals("type[bar] missing", firstFailure.getCause().getMessage());
+ assertEquals("trying to auto create mapping, but dynamic mapping is disabled", firstFailure.getCause().getCause().getMessage());
+
// make sure no mappings were created for bar
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices("index_2").get();
assertFalse(getIndexResponse.mappings().containsKey("bar"));
diff --git a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java
index 367f79e598..854164063e 100644
--- a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java
+++ b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java
@@ -25,15 +25,17 @@ import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexableFieldType;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PostingsEnum;
-import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.index.mapper.TextFieldMapper.TextFieldType;
@@ -69,7 +71,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
return pluginList(InternalSettingsPlugin.class);
}
- public void testDefaults() throws Exception {
+ public void testDefaults() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("field").field("type", "text").endObject().endObject()
.endObject().endObject().string();
@@ -185,7 +187,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
for (String option : supportedOptions.keySet()) {
jsonDoc.field(option, "1234");
}
- ParsedDocument doc = mapper.parse(SourceToParse.source("test", "type", "1", jsonDoc.endObject().bytes(),
+ ParsedDocument doc = mapper.parse(SourceToParse.source("test", "type", "1", jsonDoc.endObject().bytes(),
XContentType.JSON));
for (Map.Entry<String, IndexOptions> entry : supportedOptions.entrySet()) {
@@ -207,12 +209,13 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals(mapping, mapper.mappingSource().toString());
- ParsedDocument doc = mapper.parse(SourceToParse.source("test", "type", "1", XContentFactory.jsonBuilder()
+ SourceToParse sourceToParse = SourceToParse.source("test", "type", "1", XContentFactory.jsonBuilder()
.startObject()
.array("field", new String[] {"a", "b"})
.endObject()
.bytes(),
- XContentType.JSON));
+ XContentType.JSON);
+ ParsedDocument doc = mapper.parse(sourceToParse);
IndexableField[] fields = doc.rootDoc().getFields("field");
assertEquals(2, fields.length);
@@ -221,7 +224,8 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals("b", fields[1].stringValue());
IndexShard shard = indexService.getShard(0);
- shard.index(new Engine.Index(new Term("_id", doc.id()), doc));
+ shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
+ sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@@ -247,12 +251,13 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals(mapping, mapper.mappingSource().toString());
- ParsedDocument doc = mapper.parse(SourceToParse.source("test", "type", "1", XContentFactory.jsonBuilder()
+ SourceToParse sourceToParse = SourceToParse.source("test", "type", "1", XContentFactory.jsonBuilder()
.startObject()
- .array("field", new String[] {"a", "b"})
+ .array("field", new String[]{"a", "b"})
.endObject()
.bytes(),
- XContentType.JSON));
+ XContentType.JSON);
+ ParsedDocument doc = mapper.parse(sourceToParse);
IndexableField[] fields = doc.rootDoc().getFields("field");
assertEquals(2, fields.length);
@@ -261,7 +266,8 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals("b", fields[1].stringValue());
IndexShard shard = indexService.getShard(0);
- shard.index(new Engine.Index(new Term("_id", doc.id()), doc));
+ shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
+ sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@@ -372,7 +378,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals(mapping, mapper.mappingSource().toString());
}
- public void testTermVectors() throws Exception {
+ public void testTermVectors() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties")
.startObject("field1")
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
index 174f68da4b..41efff33ab 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
@@ -20,12 +20,12 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
-import org.apache.lucene.index.Term;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService;
@@ -42,6 +42,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -52,6 +53,7 @@ import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -59,7 +61,7 @@ import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
-import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
@@ -343,15 +345,9 @@ public class IndexShardIT extends ESSingleNodeTestCase {
client().prepareIndex("test", "test", "0")
.setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldFlush());
- ParsedDocument doc = testParsedDocument(
- "1",
- "test",
- null,
- SequenceNumbersService.UNASSIGNED_SEQ_NO,
- new ParseContext.Document(),
- new BytesArray(new byte[]{1}), XContentType.JSON, null);
- Engine.Index index = new Engine.Index(new Term("_id", doc.id()), doc);
- shard.index(index);
+ shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
+ SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
+ IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
assertTrue(shard.shouldFlush());
assertEquals(2, shard.getEngine().getTranslog().totalOperations());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
@@ -398,15 +394,9 @@ public class IndexShardIT extends ESSingleNodeTestCase {
final int numberOfDocuments = randomIntBetween(32, 128);
for (int i = 0; i < numberOfDocuments; i++) {
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
- final ParsedDocument doc = testParsedDocument(
- "1",
- "test",
- null,
- SequenceNumbersService.UNASSIGNED_SEQ_NO,
- new ParseContext.Document(),
- new BytesArray(new byte[]{1}), XContentType.JSON, null);
- final Engine.Index index = new Engine.Index(new Term("_id", doc.id()), doc);
- final Engine.IndexResult result = shard.index(index);
+ final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
+ SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
+ IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
final Translog.Location location = result.getTranslogLocation();
shard.afterWriteOperation();
if (location.translogLocation + location.size > generationThreshold) {
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 5072e7a3b8..ab81f02015 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -402,13 +402,13 @@ public class IndexShardTests extends IndexShardTestCase {
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
- final String id = Integer.toString(i);
- final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
if (!rarely()) {
- final Term uid = new Term("_id", doc.id());
- final Engine.Index index =
- new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false);
- indexShard.index(index);
+ final String id = Integer.toString(i);
+ SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
+ new BytesArray("{}"), XContentType.JSON);
+ indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
+ 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
+ getMappingUpdater(indexShard, sourceToParse.type()));
max = i;
} else {
gap = true;
@@ -976,10 +976,7 @@ public class IndexShardTests extends IndexShardTestCase {
});
recoveryShardFromStore(shard);
- ParsedDocument doc = testParsedDocument("1", "test", null, new ParseContext.Document(),
- new BytesArray(new byte[]{1}), null);
- Engine.Index index = new Engine.Index(new Term("_id", doc.id()), doc);
- shard.index(index);
+ indexDoc(shard, "test", "1");
assertEquals(1, preIndex.get());
assertEquals(1, postIndexCreate.get());
assertEquals(0, postIndexUpdate.get());
@@ -988,7 +985,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, postDelete.get());
assertEquals(0, postDeleteException.get());
- shard.index(index);
+ indexDoc(shard, "test", "1");
assertEquals(2, preIndex.get());
assertEquals(1, postIndexCreate.get());
assertEquals(1, postIndexUpdate.get());
@@ -997,8 +994,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, postDelete.get());
assertEquals(0, postDeleteException.get());
- Engine.Delete delete = new Engine.Delete("test", "1", new Term("_id", doc.id()));
- shard.delete(delete);
+ deleteDoc(shard, "test", "1");
assertEquals(2, preIndex.get());
assertEquals(1, postIndexCreate.get());
@@ -1012,7 +1008,7 @@ public class IndexShardTests extends IndexShardTestCase {
shard.state = IndexShardState.STARTED; // It will generate exception
try {
- shard.index(index);
+ indexDoc(shard, "test", "1");
fail();
} catch (AlreadyClosedException e) {
@@ -1026,7 +1022,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(1, postDelete.get());
assertEquals(0, postDeleteException.get());
try {
- shard.delete(delete);
+ deleteDoc(shard, "test", "1");
fail();
} catch (AlreadyClosedException e) {
@@ -1256,14 +1252,14 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRecoverFromStoreWithNoOps() throws IOException {
final IndexShard shard = newStartedShard(true);
indexDoc(shard, "test", "0");
- Engine.Index test = indexDoc(shard, "test", "1");
+ Engine.IndexResult test = indexDoc(shard, "test", "1");
// start a replica shard and index the second doc
final IndexShard otherShard = newStartedShard(false);
- test = otherShard.prepareIndexOnReplica(
- SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(),
- XContentType.JSON),
- 1, 1, 1, EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
- otherShard.index(test);
+ updateMappings(otherShard, shard.indexSettings().getIndexMetaData());
+ SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), "test", "1",
+ new BytesArray("{}"), XContentType.JSON);
+ otherShard.applyIndexOperationOnReplica(1, 1, 1,
+ VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, update -> {});
final ShardRouting primaryShardRouting = shard.routingEntry();
IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting,
@@ -1682,6 +1678,7 @@ public class IndexShardTests extends IndexShardTestCase {
null));
primary.recoverFromStore();
+ primary.state = IndexShardState.RECOVERING; // translog recovery on the next line would otherwise fail as we are in POST_RECOVERY
primary.runTranslogRecovery(primary.getEngine(), snapshot);
assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries));
assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries));
@@ -1690,61 +1687,6 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary);
}
- public void testTranslogOpToEngineOpConverter() throws IOException {
- Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .build();
- IndexMetaData metaData = IndexMetaData.builder("test")
- .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
- .settings(settings)
- .primaryTerm(0, 1).build();
- IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
- TranslogOpToEngineOpConverter converter = new TranslogOpToEngineOpConverter(primary.shardId(), primary.mapperService());
-
- Engine.Operation.Origin origin = randomFrom(Engine.Operation.Origin.values());
- // convert index op
- Translog.Index translogIndexOp = new Translog.Index(randomAlphaOfLength(10), randomAlphaOfLength(10), randomNonNegativeLong(),
- randomNonNegativeLong(), randomFrom(VersionType.values()), "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")),
- randomAlphaOfLength(5), randomAlphaOfLength(5), randomLong());
- Engine.Index engineIndexOp = (Engine.Index) converter.convertToEngineOp(translogIndexOp, origin);
- assertEquals(engineIndexOp.origin(), origin);
- assertEquals(engineIndexOp.primaryTerm(), translogIndexOp.primaryTerm());
- assertEquals(engineIndexOp.seqNo(), translogIndexOp.seqNo());
- assertEquals(engineIndexOp.version(), translogIndexOp.version());
- assertEquals(engineIndexOp.versionType(), translogIndexOp.versionType().versionTypeForReplicationAndRecovery());
- assertEquals(engineIndexOp.id(), translogIndexOp.id());
- assertEquals(engineIndexOp.type(), translogIndexOp.type());
- assertEquals(engineIndexOp.getAutoGeneratedIdTimestamp(), translogIndexOp.getAutoGeneratedIdTimestamp());
- assertEquals(engineIndexOp.parent(), translogIndexOp.parent());
- assertEquals(engineIndexOp.routing(), translogIndexOp.routing());
- assertEquals(engineIndexOp.source(), translogIndexOp.source());
-
- // convert delete op
- Translog.Delete translogDeleteOp = new Translog.Delete(randomAlphaOfLength(5), randomAlphaOfLength(5),
- new Term(randomAlphaOfLength(5), randomAlphaOfLength(5)), randomNonNegativeLong(), randomNonNegativeLong(),
- randomNonNegativeLong(), randomFrom(VersionType.values()));
- Engine.Delete engineDeleteOp = (Engine.Delete) converter.convertToEngineOp(translogDeleteOp, origin);
- assertEquals(engineDeleteOp.origin(), origin);
- assertEquals(engineDeleteOp.primaryTerm(), translogDeleteOp.primaryTerm());
- assertEquals(engineDeleteOp.seqNo(), translogDeleteOp.seqNo());
- assertEquals(engineDeleteOp.version(), translogDeleteOp.version());
- assertEquals(engineDeleteOp.versionType(), translogDeleteOp.versionType().versionTypeForReplicationAndRecovery());
- assertEquals(engineDeleteOp.id(), translogDeleteOp.id());
- assertEquals(engineDeleteOp.type(), translogDeleteOp.type());
- assertEquals(engineDeleteOp.uid(), translogDeleteOp.uid());
-
- // convert noop
- Translog.NoOp translogNoOp = new Translog.NoOp(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(5));
- Engine.NoOp engineNoOp = (Engine.NoOp) converter.convertToEngineOp(translogNoOp, origin);
- assertEquals(engineNoOp.origin(), origin);
- assertEquals(engineNoOp.primaryTerm(), translogNoOp.primaryTerm());
- assertEquals(engineNoOp.seqNo(), translogNoOp.seqNo());
- assertEquals(engineNoOp.reason(), translogNoOp.reason());
-
- closeShards(primary);
- }
-
public void testShardActiveDuringInternalRecovery() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "type", "0");
@@ -1880,22 +1822,7 @@ public class IndexShardTests extends IndexShardTestCase {
final long numDocsToDelete = randomIntBetween((int) Math.ceil(Math.nextUp(numDocs / 10.0)), Math.toIntExact(numDocs));
for (int i = 0; i < numDocs; i++) {
final String id = Integer.toString(i);
- final ParsedDocument doc =
- testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
- final Engine.Index index =
- new Engine.Index(
- new Term("_id", doc.id()),
- doc,
- SequenceNumbersService.UNASSIGNED_SEQ_NO,
- 0,
- Versions.MATCH_ANY,
- VersionType.INTERNAL,
- PRIMARY,
- System.nanoTime(),
- -1,
- false);
- final Engine.IndexResult result = indexShard.index(index);
- assertThat(result.getVersion(), equalTo(1L));
+ indexDoc(indexShard, "test", id);
}
indexShard.refresh("test");
@@ -1910,22 +1837,8 @@ public class IndexShardTests extends IndexShardTestCase {
IntStream.range(0, Math.toIntExact(numDocs)).boxed().collect(Collectors.toList()));
for (final Integer i : ids) {
final String id = Integer.toString(i);
- final ParsedDocument doc =
- testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
- final Engine.Index index =
- new Engine.Index(
- new Term("_id", doc.id()),
- doc,
- SequenceNumbersService.UNASSIGNED_SEQ_NO,
- 0,
- Versions.MATCH_ANY,
- VersionType.INTERNAL,
- PRIMARY,
- System.nanoTime(),
- -1,
- false);
- final Engine.IndexResult result = indexShard.index(index);
- assertThat(result.getVersion(), equalTo(2L));
+ deleteDoc(indexShard, "test", id);
+ indexDoc(indexShard, "test", id);
}
// flush the buffered deletes
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index a2e6785858..f8c971c440 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -24,7 +24,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
@@ -59,10 +58,10 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
final String index = replica.shardId().getIndexName();
long seqNo = 0;
for (int i = 0; i < docs; i++) {
- Engine.Index indexOp = replica.prepareIndexOnReplica(
+ replica.applyIndexOperationOnReplica(seqNo++, replica.getPrimaryTerm(), 1, VersionType.EXTERNAL,
+ IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON),
- seqNo++, replica.getPrimaryTerm(), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
- replica.index(indexOp);
+ update -> {});
if (rarely()) {
// insert a gap
seqNo++;
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 4600c80b7a..ca7fb99635 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -25,6 +25,7 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
@@ -54,6 +55,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.similarity.SimilarityService;
@@ -81,6 +83,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
@@ -467,44 +470,49 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
- protected Engine.Index indexDoc(IndexShard shard, String type, String id) throws IOException {
+ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException {
return indexDoc(shard, type, id, "{}");
}
- protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) throws IOException {
+ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source) throws IOException {
return indexDoc(shard, type, id, source, XContentType.JSON);
}
- protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType) throws IOException {
- final Engine.Index index;
+ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType)
+ throws IOException {
+ SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType);
if (shard.routingEntry().primary()) {
- index = shard.prepareIndexOnPrimary(
- SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
- xContentType),
- Versions.MATCH_ANY,
- VersionType.INTERNAL,
- IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
- false);
+ return shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
+ IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, type));
} else {
- index = shard.prepareIndexOnReplica(
- SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
- xContentType),
- shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(), 0,
- VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+ return shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(), 0,
+ VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, getMappingUpdater(shard, type));
}
- shard.index(index);
- return index;
}
- protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) throws IOException {
- final Engine.Delete delete;
+ protected Consumer<Mapping> getMappingUpdater(IndexShard shard, String type) {
+ return update -> {
+ try {
+ updateMappings(shard, IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
+ .putMapping(type, update.toString()).build());
+ } catch (IOException e) {
+ ExceptionsHelper.reThrowIfNotNull(e);
+ }
+ };
+ }
+
+ protected void updateMappings(IndexShard shard, IndexMetaData indexMetadata) {
+ shard.indexSettings().updateIndexMetaData(indexMetadata);
+ shard.mapperService().merge(indexMetadata, MapperService.MergeReason.MAPPING_UPDATE, true);
+ }
+
+ protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException {
if (shard.routingEntry().primary()) {
- delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);
+ return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL, update -> {});
} else {
- delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL);
+ return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(),
+ 0L, type, id, VersionType.EXTERNAL, update -> {});
}
- shard.delete(delete);
- return delete;
}
protected void flushShard(IndexShard shard) {