summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java')
-rw-r--r--core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java227
1 files changed, 182 insertions, 45 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
index 207183bae4..c92caef628 100644
--- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.lucene.Lucene;
@@ -42,6 +43,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.TestCluster;
import org.junit.Before;
import java.io.IOException;
@@ -51,12 +53,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
public class SearchPhaseControllerTests extends ESTestCase {
@@ -75,8 +81,16 @@ public class SearchPhaseControllerTests extends ESTestCase {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
- ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList());
+ Optional<SearchPhaseResult> first = results.asList().stream().findFirst();
+ int from = 0, size = 0;
+ if (first.isPresent()) {
+ from = first.get().queryResult().from();
+ size = first.get().queryResult().size();
+ }
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
+ from, size)
+ .scoreDocs;
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
accumulatedLength += suggestionSize;
@@ -84,48 +98,71 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertThat(sortedDocs.length, equalTo(accumulatedLength));
}
- public void testSortIsIdempotent() throws IOException {
+ public void testSortIsIdempotent() throws Exception {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
- AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
- randomBoolean() || true);
+ long randomSeed = randomLong();
+ boolean useConstantScore = randomBoolean();
+ AtomicArray<SearchPhaseResult> results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
+ useConstantScore);
boolean ignoreFrom = randomBoolean();
- ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList());
+ Optional<SearchPhaseResult> first = results.asList().stream().findFirst();
+ int from = 0, size = 0;
+ if (first.isPresent()) {
+ from = first.get().queryResult().from();
+ size = first.get().queryResult().size();
+ }
+ SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
+
+ results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
+ useConstantScore);
+ SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
+ ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
+ assertEquals(sortedDocs.length, sortedDocs2.length);
+ for (int i = 0; i < sortedDocs.length; i++) {
+ assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);
+ assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex);
+ assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f);
+ }
+ assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f);
+ assertEquals(topDocsStats.totalHits, topDocsStats2.totalHits);
+ assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits);
+ }
- ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList());
- assertArrayEquals(sortedDocs, sortedDocs2);
+ private AtomicArray<SearchPhaseResult> generateSeededQueryResults(long seed, int nShards,
+ List<CompletionSuggestion> suggestions,
+ int searchHitsSize, boolean useConstantScore) throws Exception {
+ return RandomizedContext.current().runWithPrivateRandomness(seed,
+ () -> generateQueryResults(nShards, suggestions, searchHitsSize, useConstantScore));
}
public void testMerge() throws IOException {
List<CompletionSuggestion> suggestions = new ArrayList<>();
+ int maxSuggestSize = 0;
for (int i = 0; i < randomIntBetween(1, 5); i++) {
- suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+ int size = randomIntBetween(1, 20);
+ maxSuggestSize += size;
+ suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), size));
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
-
- // calculate offsets and score doc array
- List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
- ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults);
- mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs));
- Suggest mergedSuggest = reducedSuggest(queryResults);
- for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
- if (suggestion instanceof CompletionSuggestion) {
- CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion);
- mergedScoreDocs.addAll(completionSuggestion.getOptions().stream()
- .map(CompletionSuggestion.Entry.Option::getDoc)
- .collect(Collectors.toList()));
- }
- }
- ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
- AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest);
- InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs,
- searchPhaseController.reducedQueryPhase(queryResults.asList()),
+ SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false);
+ AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs,
+ reducedQueryPhase.suggest);
+ InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
+ reducedQueryPhase,
searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
- assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
+ int suggestSize = 0;
+ for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
+ Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
+ suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size()));
+ }
+ assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize));
+ assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize));
Suggest suggestResult = mergedResponse.suggest();
- for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+ for (Suggest.Suggestion<?> suggestion : reducedQueryPhase.suggest) {
assertThat(suggestion, instanceOf(CompletionSuggestion.class));
if (suggestion.getEntries().get(0).getOptions().size() > 0) {
CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
@@ -209,16 +246,6 @@ public class SearchPhaseControllerTests extends ESTestCase {
.collect(Collectors.toList()));
}
- private ScoreDoc[] getTopShardDocs(AtomicArray<SearchPhaseResult> results) throws IOException {
- List<SearchPhaseResult> resultList = results.asList();
- TopDocs[] shardTopDocs = new TopDocs[resultList.size()];
- for (int i = 0; i < resultList.size(); i++) {
- shardTopDocs[i] = resultList.get(i).queryResult().topDocs();
- }
- int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results));
- return TopDocs.merge(topN, shardTopDocs).scoreDocs;
- }
-
private AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
@@ -309,30 +336,96 @@ public class SearchPhaseControllerTests extends ESTestCase {
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
- CountDownLatch latch = new CountDownLatch(expectedNumResults);
+ Thread[] threads = new Thread[expectedNumResults];
for (int i = 0; i < expectedNumResults; i++) {
int id = i;
- Thread t = new Thread(() -> {
+ threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
- result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
+ result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(id);
+ result.size(1);
consumer.consumeResult(result);
- latch.countDown();
});
- t.start();
+ threads[i].start();
+ }
+ for (int i = 0; i < expectedNumResults; i++) {
+ threads[i].join();
}
- latch.await();
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(max.get(), internalMax.getValue(), 0.0D);
+ assertEquals(1, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f);
}
+ public void testConsumerOnlyAggs() throws InterruptedException {
+ int expectedNumResults = randomIntBetween(1, 100);
+ int bufferSize = randomIntBetween(2, 200);
+ SearchRequest request = new SearchRequest();
+ request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
+ request.setBatchedReduceSize(bufferSize);
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
+ AtomicInteger max = new AtomicInteger();
+ for (int i = 0; i < expectedNumResults; i++) {
+ int id = i;
+ int number = randomIntBetween(1, 1000);
+ max.updateAndGet(prev -> Math.max(prev, number));
+ QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
+ result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]);
+ InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
+ DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
+ result.aggregations(aggs);
+ result.setShardIndex(id);
+ result.size(1);
+ consumer.consumeResult(result);
+ }
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
+ assertEquals(max.get(), internalMax.getValue(), 0.0D);
+ assertEquals(0, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ }
+
+
+ public void testConsumerOnlyHits() throws InterruptedException {
+ int expectedNumResults = randomIntBetween(1, 100);
+ int bufferSize = randomIntBetween(2, 200);
+ SearchRequest request = new SearchRequest();
+ if (randomBoolean()) {
+ request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
+ }
+ request.setBatchedReduceSize(bufferSize);
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
+ AtomicInteger max = new AtomicInteger();
+ for (int i = 0; i < expectedNumResults; i++) {
+ int id = i;
+ int number = randomIntBetween(1, 1000);
+ max.updateAndGet(prev -> Math.max(prev, number));
+ QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
+ result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
+ result.setShardIndex(id);
+ result.size(1);
+ consumer.consumeResult(result);
+ }
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(1, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f);
+ }
+
+
public void testNewSearchPhaseResults() {
for (int i = 0; i < 10; i++) {
int expectedNumResults = randomIntBetween(1, 10);
@@ -342,10 +435,22 @@ public class SearchPhaseControllerTests extends ESTestCase {
if ((hasAggs = randomBoolean())) {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
}
+ final boolean hasTopDocs;
+ if ((hasTopDocs = randomBoolean())) {
+ if (request.source() != null) {
+ request.source().size(randomIntBetween(1, 100));
+ } // no source means size = 10
+ } else {
+ if (request.source() == null) {
+ request.source(new SearchSourceBuilder().size(0));
+ } else {
+ request.source().size(0);
+ }
+ }
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
- if (hasAggs && expectedNumResults > bufferSize) {
+ if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) {
assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize,
consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
} else {
@@ -354,4 +459,36 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
}
}
+
+ public void testReduceTopNWithFromOffset() {
+ SearchRequest request = new SearchRequest();
+ request.source(new SearchSourceBuilder().size(5).from(5));
+ request.setBatchedReduceSize(randomIntBetween(2, 4));
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, 4);
+ int score = 100;
+ for (int i = 0; i < 4; i++) {
+ QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i));
+ ScoreDoc[] docs = new ScoreDoc[3];
+ for (int j = 0; j < docs.length; j++) {
+ docs[j] = new ScoreDoc(0, score--);
+ }
+ result.topDocs(new TopDocs(3, docs, docs[0].score), new DocValueFormat[0]);
+ result.setShardIndex(i);
+ result.size(5);
+ result.from(5);
+ consumer.consumeResult(result);
+ }
+ // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5
+
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(5, reduce.scoreDocs.length);
+ assertEquals(100.f, reduce.maxScore, 0.0f);
+ assertEquals(12, reduce.totalHits);
+ assertEquals(95.0f, reduce.scoreDocs[0].score, 0.0f);
+ assertEquals(94.0f, reduce.scoreDocs[1].score, 0.0f);
+ assertEquals(93.0f, reduce.scoreDocs[2].score, 0.0f);
+ assertEquals(92.0f, reduce.scoreDocs[3].score, 0.0f);
+ assertEquals(91.0f, reduce.scoreDocs[4].score, 0.0f);
+ }
}