summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/common
diff options
context:
space:
mode:
authorBoaz Leskes <b.leskes@gmail.com>2017-04-14 21:46:17 +0200
committerGitHub <noreply@github.com>2017-04-14 21:46:17 +0200
commitecf81688fb0c0d0f0f85300d0cf9026d7eb535ef (patch)
treed548aee3be05312bac3a95cd11be67b0826ad529 /core/src/test/java/org/elasticsearch/common
parent162ce85ff20498484b99a50acc9bc5d4d4f1b8e1 (diff)
Use sequence numbers to identify out of order delivery in replicas & recovery (#24060)
Internal indexing requests in Elasticsearch may be processed out of order and repeatedly. This is important during recovery and due to concurrency in replicating requests between primary and replicas. As such, a replica/recovering shard needs to be able to identify that an incoming request contains information that is old and thus need not be processed. The current logic is based on external version. This is sadly not sufficient. This PR moves the logic to rely on sequences numbers and primary terms which give the semantics we need. Relates to #10708
Diffstat (limited to 'core/src/test/java/org/elasticsearch/common')
-rw-r--r--core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java6
-rw-r--r--core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java20
2 files changed, 13 insertions, 13 deletions
diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java
index d771ced56f..8b68e76957 100644
--- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java
+++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java
@@ -31,7 +31,7 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;
-import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
+import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.test.ESTestCase;
@@ -53,7 +53,7 @@ public class VersionLookupTests extends ESTestCase {
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
- PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
+ PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader());
// found doc
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
assertNotNull(result);
@@ -81,7 +81,7 @@ public class VersionLookupTests extends ESTestCase {
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
- PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
+ PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader());
// return the last doc when there are duplicates
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
assertNotNull(result);
diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java
index 6b9960294e..c5e66a3bf2 100644
--- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java
+++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java
@@ -38,8 +38,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion;
-import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion;
+import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion;
+import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@@ -145,7 +145,7 @@ public class VersionsTests extends ESTestCase {
/** Test that version map cache works, is evicted on close, etc */
public void testCache() throws Exception {
- int size = VersionsResolver.lookupStates.size();
+ int size = VersionsAndSeqNoResolver.lookupStates.size();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
@@ -156,21 +156,21 @@ public class VersionsTests extends ESTestCase {
DirectoryReader reader = DirectoryReader.open(writer);
// should increase cache size by 1
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
- assertEquals(size+1, VersionsResolver.lookupStates.size());
+ assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
// should be cache hit
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
- assertEquals(size+1, VersionsResolver.lookupStates.size());
+ assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
reader.close();
writer.close();
// core should be evicted from the map
- assertEquals(size, VersionsResolver.lookupStates.size());
+ assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size());
dir.close();
}
/** Test that version map cache behaves properly with a filtered reader */
public void testCacheFilterReader() throws Exception {
- int size = VersionsResolver.lookupStates.size();
+ int size = VersionsAndSeqNoResolver.lookupStates.size();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
@@ -180,17 +180,17 @@ public class VersionsTests extends ESTestCase {
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
- assertEquals(size+1, VersionsResolver.lookupStates.size());
+ assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
// now wrap the reader
DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5));
assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6")));
// same size map: core cache key is shared
- assertEquals(size+1, VersionsResolver.lookupStates.size());
+ assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
reader.close();
writer.close();
// core should be evicted from the map
- assertEquals(size, VersionsResolver.lookupStates.size());
+ assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size());
dir.close();
}
}