diff options
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java | 227 |
1 files changed, 182 insertions, 45 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 207183bae4..c92caef628 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import com.carrotsearch.randomizedtesting.RandomizedContext; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.lucene.Lucene; @@ -42,6 +43,7 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCluster; import org.junit.Before; import java.io.IOException; @@ -51,12 +53,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class SearchPhaseControllerTests extends ESTestCase { @@ -75,8 +81,16 @@ public class SearchPhaseControllerTests extends ESTestCase { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, suggestions, queryResultSize, false); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList()); + Optional<SearchPhaseResult> first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(), + from, size) + .scoreDocs; for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); accumulatedLength += suggestionSize; @@ -84,48 +98,71 @@ public class SearchPhaseControllerTests extends ESTestCase { assertThat(sortedDocs.length, equalTo(accumulatedLength)); } - public void testSortIsIdempotent() throws IOException { + public void testSortIsIdempotent() throws Exception { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize, - randomBoolean() || true); + long randomSeed = randomLong(); + boolean useConstantScore = randomBoolean(); + AtomicArray<SearchPhaseResult> results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, + useConstantScore); boolean ignoreFrom = randomBoolean(); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList()); + Optional<SearchPhaseResult> first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } + SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; + + results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, + useConstantScore); + SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(); + ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; + assertEquals(sortedDocs.length, sortedDocs2.length); + for (int i = 0; i < sortedDocs.length; i++) { + assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc); + assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); + assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); + } + assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f); + assertEquals(topDocsStats.totalHits, topDocsStats2.totalHits); + assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits); + } - ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList()); - assertArrayEquals(sortedDocs, sortedDocs2); + private AtomicArray<SearchPhaseResult> generateSeededQueryResults(long seed, int nShards, + List<CompletionSuggestion> suggestions, + int searchHitsSize, boolean useConstantScore) throws Exception { + return RandomizedContext.current().runWithPrivateRandomness(seed, + () -> generateQueryResults(nShards, suggestions, searchHitsSize, useConstantScore)); } public void testMerge() throws IOException { List<CompletionSuggestion> suggestions = new ArrayList<>(); + int maxSuggestSize = 0; for (int i = 0; i < randomIntBetween(1, 5); i++) { - suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20))); + int size = randomIntBetween(1, 20); + maxSuggestSize += size; + suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), size)); } int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); - - // calculate offsets and score doc array - List<ScoreDoc> mergedScoreDocs = new ArrayList<>(); - ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults); - mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs)); - Suggest mergedSuggest = reducedSuggest(queryResults); - for (Suggest.Suggestion<?> suggestion : mergedSuggest) { - if (suggestion instanceof CompletionSuggestion) { - CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion); - mergedScoreDocs.addAll(completionSuggestion.getOptions().stream() - .map(CompletionSuggestion.Entry.Option::getDoc) - .collect(Collectors.toList())); - } - } - ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]); - AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest); - InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, - searchPhaseController.reducedQueryPhase(queryResults.asList()), + SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false); + AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs, + reducedQueryPhase.suggest); + InternalSearchResponse mergedResponse = searchPhaseController.merge(false, + reducedQueryPhase, searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get); - assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length)); + int suggestSize = 0; + for (Suggest.Suggestion s : reducedQueryPhase.suggest) { + Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream(); + suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size())); + } + assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize)); + assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize)); Suggest suggestResult = mergedResponse.suggest(); - for (Suggest.Suggestion<?> suggestion : mergedSuggest) { + for (Suggest.Suggestion<?> suggestion : reducedQueryPhase.suggest) { assertThat(suggestion, instanceOf(CompletionSuggestion.class)); if (suggestion.getEntries().get(0).getOptions().size() > 0) { CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName()); @@ -209,16 +246,6 @@ public class SearchPhaseControllerTests extends ESTestCase { .collect(Collectors.toList())); } - private ScoreDoc[] getTopShardDocs(AtomicArray<SearchPhaseResult> results) throws IOException { - List<SearchPhaseResult> resultList = results.asList(); - TopDocs[] shardTopDocs = new TopDocs[resultList.size()]; - for (int i = 0; i < resultList.size(); i++) { - shardTopDocs[i] = resultList.get(i).queryResult().topDocs(); - } - int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results)); - return TopDocs.merge(topN, shardTopDocs).scoreDocs; - } - private AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { @@ -309,30 +336,96 @@ public class SearchPhaseControllerTests extends ESTestCase { InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); AtomicInteger max = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(expectedNumResults); + Thread[] threads = new Thread[expectedNumResults]; for (int i = 0; i < expectedNumResults; i++) { int id = i; - Thread t = new Thread(() -> { + threads[i] = new Thread(() -> { int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); - result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(id); + result.size(1); consumer.consumeResult(result); - latch.countDown(); }); - t.start(); + threads[i].start(); + } + for (int i = 0; i < expectedNumResults; i++) { + threads[i].join(); } - latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(1, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f); } + public void testConsumerOnlyAggs() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(0, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + } + + + public void testConsumerOnlyHits() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = new SearchRequest(); + if (randomBoolean()) { + request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); + } + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(1, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f); + } + + public void testNewSearchPhaseResults() { for (int i = 0; i < 10; i++) { int expectedNumResults = randomIntBetween(1, 10); @@ -342,10 +435,22 @@ public class SearchPhaseControllerTests extends ESTestCase { if ((hasAggs = randomBoolean())) { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); } + final boolean hasTopDocs; + if ((hasTopDocs = randomBoolean())) { + if (request.source() != null) { + request.source().size(randomIntBetween(1, 100)); + } // no source means size = 10 + } else { + if (request.source() == null) { + request.source(new SearchSourceBuilder().size(0)); + } else { + request.source().size(0); + } + } request.setBatchedReduceSize(bufferSize); InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); - if (hasAggs && expectedNumResults > bufferSize) { + if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) { assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); } else { @@ -354,4 +459,36 @@ public class SearchPhaseControllerTests extends ESTestCase { } } } + + public void testReduceTopNWithFromOffset() { + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().size(5).from(5)); + request.setBatchedReduceSize(randomIntBetween(2, 4)); + InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = + searchPhaseController.newSearchPhaseResults(request, 4); + int score = 100; + for (int i = 0; i < 4; i++) { + QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i)); + ScoreDoc[] docs = new ScoreDoc[3]; + for (int j = 0; j < docs.length; j++) { + docs[j] = new ScoreDoc(0, score--); + } + result.topDocs(new TopDocs(3, docs, docs[0].score), new DocValueFormat[0]); + result.setShardIndex(i); + result.size(5); + result.from(5); + consumer.consumeResult(result); + } + // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5 + + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(5, reduce.scoreDocs.length); + assertEquals(100.f, reduce.maxScore, 0.0f); + assertEquals(12, reduce.totalHits); + assertEquals(95.0f, reduce.scoreDocs[0].score, 0.0f); + assertEquals(94.0f, reduce.scoreDocs[1].score, 0.0f); + assertEquals(93.0f, reduce.scoreDocs[2].score, 0.0f); + assertEquals(92.0f, reduce.scoreDocs[3].score, 0.0f); + assertEquals(91.0f, reduce.scoreDocs[4].score, 0.0f); + } } |