summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-19 20:11:54 +0200
committerGitHub <noreply@github.com>2017-06-19 20:11:54 +0200
commit1a20760d797ddf540e4c3cefeae5e8f194774700 (patch)
treed4532157abcf4c8021238196ad9c21a1c7df6ef7 /test
parentd1be2ecfdb0910f1c365ec3d51958a4434b33eb9 (diff)
Simplify IndexShard indexing and deletion methods (#25249)
Indexing or deleting documents through the IndexShard interface is quite complex and error-prone. It requires multiple calls, e.g. first prepareIndexOnPrimary, then do some checks if mapping updates have occurred, then do the actual indexing using index(...) etc. Currently each consumer of the interface (local recovery, peer recovery, replication) has additional custom checks built around it to deal with mapping updates, some of which are even inconsistent. This commit aims at reducing the complexity by exposing a simpler interface on IndexShard. There are no more prepare*** methods and the mapping complexity is also hidden, but still giving callers a possibility to implement custom logic to deal with mapping updates.
Diffstat (limited to 'test')
-rw-r--r--test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java56
1 files changed, 32 insertions, 24 deletions
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 4600c80b7a..ca7fb99635 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -25,6 +25,7 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
@@ -54,6 +55,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.similarity.SimilarityService;
@@ -81,6 +83,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
@@ -467,44 +470,49 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
- protected Engine.Index indexDoc(IndexShard shard, String type, String id) throws IOException {
+ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException {
return indexDoc(shard, type, id, "{}");
}
- protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) throws IOException {
+ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source) throws IOException {
return indexDoc(shard, type, id, source, XContentType.JSON);
}
- protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType) throws IOException {
- final Engine.Index index;
+ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType)
+ throws IOException {
+ SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType);
if (shard.routingEntry().primary()) {
- index = shard.prepareIndexOnPrimary(
- SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
- xContentType),
- Versions.MATCH_ANY,
- VersionType.INTERNAL,
- IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
- false);
+ return shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
+ IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, type));
} else {
- index = shard.prepareIndexOnReplica(
- SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
- xContentType),
- shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(), 0,
- VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+ return shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(), 0,
+ VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, getMappingUpdater(shard, type));
}
- shard.index(index);
- return index;
}
- protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) throws IOException {
- final Engine.Delete delete;
+ protected Consumer<Mapping> getMappingUpdater(IndexShard shard, String type) {
+ return update -> {
+ try {
+ updateMappings(shard, IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
+ .putMapping(type, update.toString()).build());
+ } catch (IOException e) {
+ ExceptionsHelper.reThrowIfNotNull(e);
+ }
+ };
+ }
+
+ protected void updateMappings(IndexShard shard, IndexMetaData indexMetadata) {
+ shard.indexSettings().updateIndexMetaData(indexMetadata);
+ shard.mapperService().merge(indexMetadata, MapperService.MergeReason.MAPPING_UPDATE, true);
+ }
+
+ protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException {
if (shard.routingEntry().primary()) {
- delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);
+ return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL, update -> {});
} else {
- delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL);
+ return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(),
+ 0L, type, id, VersionType.EXTERNAL, update -> {});
}
- shard.delete(delete);
- return delete;
}
protected void flushShard(IndexShard shard) {