summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/index
diff options
context:
space:
mode:
authorYu <yu.liu003@gmail.com>2017-06-06 18:39:02 +0200
committerNik Everett <nik9000@gmail.com>2017-06-06 12:39:02 -0400
commit40a13345d785c9db6bc3dcd78e61e6df5971b894 (patch)
tree6d8fb5dc7692992c8ffca135644c4307a913db30 /core/src/test/java/org/elasticsearch/index
parentd6d0c13bd63f28347ebdb0f9364e44921c248b8b (diff)
Add refresh stats tracking for realtime get (#25052)
Passes a `LongConsumer` into the `Engine` during GETs which the engine calls if it refreshed to perform the get. Closes #24806
Diffstat (limited to 'core/src/test/java/org/elasticsearch/index')
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java47
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java3
2 files changed, 34 insertions, 16 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 42eef064a9..12a6f13b8d 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -920,7 +920,10 @@ public class InternalEngineTests extends ESTestCase {
engine.index(indexForDoc(doc));
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
- latestGetResult.set(engine.get(newGet(true, doc)));
+ final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
+ final AtomicBoolean refreshed = new AtomicBoolean(false);
+ latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)));
+ assertTrue("failed to refresh", refreshed.get());
final AtomicBoolean flushFinished = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread getThread = new Thread(() -> {
@@ -934,7 +937,7 @@ public class InternalEngineTests extends ESTestCase {
if (previousGetResult != null) {
previousGetResult.release();
}
- latestGetResult.set(engine.get(newGet(true, doc)));
+ latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")));
if (latestGetResult.get().exists() == false) {
break;
}
@@ -954,6 +957,9 @@ public class InternalEngineTests extends ESTestCase {
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
+ final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
+ final AtomicBoolean refreshed = new AtomicBoolean(false);
+
// create a document
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
@@ -967,12 +973,13 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// but, not there non realtime
- Engine.GetResult getResult = engine.get(newGet(false, doc));
+ Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// but, we can still get it (in realtime)
- getResult = engine.get(newGet(true, doc));
+ getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true));
+ assertTrue("failed to refresh", refreshed.getAndSet(false));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@@ -987,7 +994,7 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// also in non realtime
- getResult = engine.get(newGet(false, doc));
+ getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@@ -1007,7 +1014,8 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// but, we can still get it (in realtime)
- getResult = engine.get(newGet(true, doc));
+ getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true));
+ assertTrue("failed to refresh", refreshed.get());
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@@ -1032,7 +1040,7 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// but, get should not see it (in realtime)
- getResult = engine.get(newGet(true, doc));
+ getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
@@ -1072,7 +1080,7 @@ public class InternalEngineTests extends ESTestCase {
engine.flush();
// and, verify get (in real time)
- getResult = engine.get(newGet(true, doc));
+ getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@@ -1858,6 +1866,8 @@ public class InternalEngineTests extends ESTestCase {
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
final Term uidTerm = newUid(doc);
engine.index(indexForDoc(doc));
+ final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
+ final AtomicBoolean refreshed = new AtomicBoolean(false);
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread(() -> {
startGun.countDown();
@@ -1867,7 +1877,7 @@ public class InternalEngineTests extends ESTestCase {
throw new AssertionError(e);
}
for (int op = 0; op < opsPerThread; op++) {
- try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) {
+ try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
@@ -1895,6 +1905,7 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
+ assertTrue("failed to refresh", refreshed.getAndSet(false));
List<OpAndVersion> sortedHistory = new ArrayList<>(history);
sortedHistory.sort(Comparator.comparing(o -> o.version));
Set<String> currentValues = new HashSet<>();
@@ -1909,7 +1920,8 @@ public class InternalEngineTests extends ESTestCase {
assertTrue(op.added + " should not exist", exists);
}
- try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) {
+ try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
+ assertTrue("failed to refresh", refreshed.get());
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString()));
@@ -2262,6 +2274,8 @@ public class InternalEngineTests extends ESTestCase {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
engine.config().setEnableGcDeletes(false);
+ final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
+
// Add document
Document document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
@@ -2273,7 +2287,7 @@ public class InternalEngineTests extends ESTestCase {
engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document
- Engine.GetResult getResult = engine.get(newGet(true, doc));
+ Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
assertThat(getResult.exists(), equalTo(false));
// Give the gc pruning logic a chance to kick in
@@ -2287,7 +2301,7 @@ public class InternalEngineTests extends ESTestCase {
engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document (we never indexed uid=2):
- getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")));
+ getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=1 with a too-old version, should fail:
@@ -2297,7 +2311,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// Get should still not find the document
- getResult = engine.get(newGet(true, doc));
+ getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=2 with a too-old version, should fail:
@@ -2307,7 +2321,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// Get should not find the document
- getResult = engine.get(newGet(true, doc));
+ getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
assertThat(getResult.exists(), equalTo(false));
}
}
@@ -3639,6 +3653,8 @@ public class InternalEngineTests extends ESTestCase {
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
final Term uid = newUid(doc);
+ final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
+ final AtomicBoolean refreshed = new AtomicBoolean(false);
for (int i = 0; i < numberOfOperations; i++) {
if (randomBoolean()) {
final Engine.Index index = new Engine.Index(
@@ -3700,7 +3716,8 @@ public class InternalEngineTests extends ESTestCase {
}
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
- try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid))) {
+ try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory, (onRefresh) -> refreshed.set(exists))) {
+ assertEquals("failed to refresh", exists, refreshed.get());
assertThat(result.exists(), equalTo(exists));
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
index d1d7aebc92..99114b4819 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
@@ -298,7 +298,8 @@ public class RefreshListenersTests extends ESTestCase {
listener.assertNoError();
Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId));
- try (Engine.GetResult getResult = engine.get(get)) {
+ try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher,
+ onRefresh -> fail("shouldn't have a refresh"))) {
assertTrue("document not found", getResult.exists());
assertEquals(iteration, getResult.version());
SingleFieldsVisitor visitor = new SingleFieldsVisitor("test");