diff options
5 files changed, 41 insertions, 23 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 70ec03c09f..ec893e8abc 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -89,6 +89,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.function.LongConsumer; public abstract class Engine implements Closeable { @@ -485,11 +486,7 @@ public abstract class Engine implements Closeable { } } - public final GetResult get(Get get) throws EngineException { - return get(get, this::acquireSearcher); - } - - public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException; + public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException; /** * Returns a new searcher instance. The consumer of this diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0602fb2768..f6e0d17405 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -92,6 +92,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.function.LongSupplier; public class InternalEngine extends Engine { @@ -404,7 +405,7 @@ public class InternalEngine extends Engine { } @Override - public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException { + public GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException { assert Objects.equals(get.uid().field(), uidField) : get.uid().field(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); @@ -418,7 +419,9 @@ public class InternalEngine extends Engine { throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } + long time = System.nanoTime(); refresh("realtime_get"); + onRefresh.accept(System.nanoTime() - time); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7cb8afb5cd..fcc3556e91 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -661,7 +661,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public Engine.GetResult get(Engine.Get get) { readAllowed(); - return getEngine().get(get, this::acquireSearcher); + return getEngine().get(get, this::acquireSearcher, (timeElapsed) -> refreshMetric.inc(timeElapsed)); } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 42eef064a9..12a6f13b8d 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -920,7 +920,10 @@ public class InternalEngineTests extends ESTestCase { engine.index(indexForDoc(doc)); final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>(); - latestGetResult.set(engine.get(newGet(true, doc))); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); + latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true))); + assertTrue("failed to refresh", refreshed.get()); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); Thread getThread = new Thread(() -> { @@ -934,7 +937,7 @@ public class InternalEngineTests extends ESTestCase { if (previousGetResult != null) { previousGetResult.release(); } - latestGetResult.set(engine.get(newGet(true, doc))); + latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"))); if (latestGetResult.get().exists() == false) { break; } @@ -954,6 +957,9 @@ public class InternalEngineTests extends ESTestCase { MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); searchResult.close(); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); + // create a document Document document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); @@ -967,12 +973,13 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, not there non realtime - Engine.GetResult getResult = engine.get(newGet(false, doc)); + Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh")); assertThat(getResult.exists(), equalTo(false)); getResult.release(); // but, we can still get it (in realtime) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)); + assertTrue("failed to refresh", refreshed.getAndSet(false)); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -987,7 +994,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // also in non realtime - getResult = engine.get(newGet(false, doc)); + getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh")); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1007,7 +1014,8 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, we can still get it (in realtime) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)); + assertTrue("failed to refresh", refreshed.get()); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1032,7 +1040,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, get should not see it (in realtime) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted")); assertThat(getResult.exists(), equalTo(false)); getResult.release(); @@ -1072,7 +1080,7 @@ public class InternalEngineTests extends ESTestCase { engine.flush(); // and, verify get (in real time) - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1858,6 +1866,8 @@ public class InternalEngineTests extends ESTestCase { ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); final Term uidTerm = newUid(doc); engine.index(indexForDoc(doc)); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); for (int i = 0; i < thread.length; i++) { thread[i] = new Thread(() -> { startGun.countDown(); @@ -1867,7 +1877,7 @@ public class InternalEngineTests extends ESTestCase { throw new AssertionError(e); } for (int op = 0; op < opsPerThread; op++) { - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) { FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); @@ -1895,6 +1905,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < thread.length; i++) { thread[i].join(); } + assertTrue("failed to refresh", refreshed.getAndSet(false)); List<OpAndVersion> sortedHistory = new ArrayList<>(history); sortedHistory.sort(Comparator.comparing(o -> o.version)); Set<String> currentValues = new HashSet<>(); @@ -1909,7 +1920,8 @@ public class InternalEngineTests extends ESTestCase { assertTrue(op.added + " should not exist", exists); } - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) { + assertTrue("failed to refresh", refreshed.get()); FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); @@ -2262,6 +2274,8 @@ public class InternalEngineTests extends ESTestCase { Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + // Add document Document document = testDocument(); document.add(new TextField("value", "test1", Field.Store.YES)); @@ -2273,7 +2287,7 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document - Engine.GetResult getResult = engine.get(newGet(true, doc)); + Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted")); assertThat(getResult.exists(), equalTo(false)); // Give the gc pruning logic a chance to kick in @@ -2287,7 +2301,7 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document (we never indexed uid=2): - getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2"))); + getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: @@ -2297,7 +2311,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should still not find the document - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=2 with a too-old version, should fail: @@ -2307,7 +2321,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should not find the document - getResult = engine.get(newGet(true, doc)); + getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); assertThat(getResult.exists(), equalTo(false)); } } @@ -3639,6 +3653,8 @@ public class InternalEngineTests extends ESTestCase { document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); final Term uid = newUid(doc); + final Function<String, Searcher> searcherFactory = engine::acquireSearcher; + final AtomicBoolean refreshed = new AtomicBoolean(false); for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( @@ -3700,7 +3716,8 @@ public class InternalEngineTests extends ESTestCase { } assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); - try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid))) { + try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory, (onRefresh) -> refreshed.set(exists))) { + assertEquals("failed to refresh", exists, refreshed.get()); assertThat(result.exists(), equalTo(exists)); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index d1d7aebc92..99114b4819 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -298,7 +298,8 @@ public class RefreshListenersTests extends ESTestCase { listener.assertNoError(); Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); - try (Engine.GetResult getResult = engine.get(get)) { + try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher, + onRefresh -> fail("shouldn't have a refresh"))) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); SingleFieldsVisitor visitor = new SingleFieldsVisitor("test"); |