diff options
author | Boaz Leskes <b.leskes@gmail.com> | 2017-04-14 21:46:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-14 21:46:17 +0200 |
commit | ecf81688fb0c0d0f0f85300d0cf9026d7eb535ef (patch) | |
tree | d548aee3be05312bac3a95cd11be67b0826ad529 /core/src/test/java/org/elasticsearch/common | |
parent | 162ce85ff20498484b99a50acc9bc5d4d4f1b8e1 (diff) |
Use sequence numbers to identify out of order delivery in replicas & recovery (#24060)
Internal indexing requests in Elasticsearch may be processed out of order and repeatedly. This is important during recovery and due to concurrency in replicating requests between primary and replicas. As such, a replica/recovering shard needs to be able to identify that an incoming request contains information that is old and thus need not be processed. The current logic is based on external version. This is sadly not sufficient. This PR moves the logic to rely on sequences numbers and primary terms which give the semantics we need.
Relates to #10708
Diffstat (limited to 'core/src/test/java/org/elasticsearch/common')
-rw-r--r-- | core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java | 6 | ||||
-rw-r--r-- | core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java | 20 |
2 files changed, 13 insertions, 13 deletions
diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index d771ced56f..8b68e76957 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -31,7 +31,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.test.ESTestCase; @@ -53,7 +53,7 @@ public class VersionLookupTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader()); // found doc DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); @@ -81,7 +81,7 @@ public class VersionLookupTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader()); // return the last doc when there are duplicates DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 6b9960294e..c5e66a3bf2 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -38,8 +38,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion; -import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion; +import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion; +import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -145,7 +145,7 @@ public class VersionsTests extends ESTestCase { /** Test that version map cache works, is evicted on close, etc */ public void testCache() throws Exception { - int size = VersionsResolver.lookupStates.size(); + int size = VersionsAndSeqNoResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -156,21 +156,21 @@ public class VersionsTests extends ESTestCase { DirectoryReader reader = DirectoryReader.open(writer); // should increase cache size by 1 assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // should be cache hit assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, VersionsResolver.lookupStates.size()); + assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size()); dir.close(); } /** Test that version map cache behaves properly with a filtered reader */ public void testCacheFilterReader() throws Exception { - int size = VersionsResolver.lookupStates.size(); + int size = VersionsAndSeqNoResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -180,17 +180,17 @@ public class VersionsTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // now wrap the reader DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5)); assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6"))); // same size map: core cache key is shared - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, VersionsResolver.lookupStates.size()); + assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size()); dir.close(); } } |