diff options
author | Yu <yu.liu003@gmail.com> | 2017-06-06 18:39:02 +0200 |
---|---|---|
committer | Nik Everett <nik9000@gmail.com> | 2017-06-06 12:39:02 -0400 |
commit | 40a13345d785c9db6bc3dcd78e61e6df5971b894 (patch) | |
tree | 6d8fb5dc7692992c8ffca135644c4307a913db30 /core/src/test/java/org/elasticsearch/index | |
parent | d6d0c13bd63f28347ebdb0f9364e44921c248b8b (diff) |
Add refresh stats tracking for realtime get (#25052)
Passes a `LongConsumer` into the `Engine` during GETs which the engine
calls if it refreshed to perform the get.
Closes #24806
Diffstat (limited to 'core/src/test/java/org/elasticsearch/index')
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java | 47 | ||||
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java | 3 |
2 files changed, 34 insertions, 16 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 42eef064a9..12a6f13b8d 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -920,7 +920,10 @@ public class InternalEngineTests extends ESTestCase { engine.index(indexForDoc(doc)); final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>(); - latestGetResult.set(engine.get(newGet(true, doc))); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); + latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true))); + assertTrue("failed to refresh", refreshed.get()); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); Thread getThread = new Thread(() -> { @@ -934,7 +937,7 @@ public class InternalEngineTests extends ESTestCase { if (previousGetResult != null) { previousGetResult.release(); } - latestGetResult.set(engine.get(newGet(true, doc))); + latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"))); if (latestGetResult.get().exists() == false) { break; } @@ -954,6 +957,9 @@ public class InternalEngineTests extends ESTestCase { MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); searchResult.close(); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); + // create a document Document document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); @@ -967,12 +973,13 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, not there non realtime - Engine.GetResult getResult = engine.get(newGet(false, doc)); + Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh")); assertThat(getResult.exists(), equalTo(false)); getResult.release(); // but, we can still get it (in realtime) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)); + assertTrue("failed to refresh", refreshed.getAndSet(false)); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -987,7 +994,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // also in non realtime - getResult = engine.get(newGet(false, doc)); + getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh")); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1007,7 +1014,8 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, we can still get it (in realtime) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)); + assertTrue("failed to refresh", refreshed.get()); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1032,7 +1040,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, get should not see it (in realtime) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted")); assertThat(getResult.exists(), equalTo(false)); getResult.release(); @@ -1072,7 +1080,7 @@ public class InternalEngineTests extends ESTestCase { engine.flush(); // and, verify get (in real time) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1858,6 +1866,8 @@ public class InternalEngineTests extends ESTestCase { ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); final Term uidTerm = newUid(doc); engine.index(indexForDoc(doc)); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); for (int i = 0; i < thread.length; i++) { thread[i] = new Thread(() -> { startGun.countDown(); @@ -1867,7 +1877,7 @@ public class InternalEngineTests extends ESTestCase { throw new AssertionError(e); } for (int op = 0; op < opsPerThread; op++) { - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) { FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); @@ -1895,6 +1905,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < thread.length; i++) { thread[i].join(); } + assertTrue("failed to refresh", refreshed.getAndSet(false)); List<OpAndVersion> sortedHistory = new ArrayList<>(history); sortedHistory.sort(Comparator.comparing(o -> o.version)); Set<String> currentValues = new HashSet<>(); @@ -1909,7 +1920,8 @@ public class InternalEngineTests extends ESTestCase { assertTrue(op.added + " should not exist", exists); } - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) { + assertTrue("failed to refresh", refreshed.get()); FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); @@ -2262,6 +2274,8 @@ public class InternalEngineTests extends ESTestCase { Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + // Add document Document document = testDocument(); document.add(new TextField("value", "test1", Field.Store.YES)); @@ -2273,7 +2287,7 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document - Engine.GetResult getResult = engine.get(newGet(true, doc)); + Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted")); assertThat(getResult.exists(), equalTo(false)); // Give the gc pruning logic a chance to kick in @@ -2287,7 +2301,7 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document (we never indexed uid=2): - getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2"))); + getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: @@ -2297,7 +2311,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should still not find the document - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=2 with a too-old version, should fail: @@ -2307,7 +2321,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should not find the document - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); assertThat(getResult.exists(), equalTo(false)); } } @@ -3639,6 +3653,8 @@ public class InternalEngineTests extends ESTestCase { document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); final Term uid = newUid(doc); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( @@ -3700,7 +3716,8 @@ public class InternalEngineTests extends ESTestCase { } assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); - try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid))) { + try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory, (onRefresh) -> refreshed.set(exists))) { + assertEquals("failed to refresh", exists, refreshed.get()); assertThat(result.exists(), equalTo(exists)); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index d1d7aebc92..99114b4819 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -298,7 +298,8 @@ public class RefreshListenersTests extends ESTestCase { listener.assertNoError(); Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); - try (Engine.GetResult getResult = engine.get(get)) { + try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher, + onRefresh -> fail("shouldn't have a refresh"))) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); SingleFieldsVisitor visitor = new SingleFieldsVisitor("test"); |