summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Willnauer <simonw@apache.org>2017-04-10 09:37:52 +0200
committerGitHub <noreply@github.com>2017-04-10 09:37:52 +0200
commit1f40f8a2d23759f58ab53c24628c6f340edb5de9 (patch)
treec5b9a50db46011ecb35b1ea2f3ea5807d0143bcb
parentb636ca79d579dbef3965b578fa8253aa6189e263 (diff)
Introduce incremental reduction of TopDocs (#23946)
This commit adds support for incremental top N reduction if the number of expected shards in the search request is high enough. The changes here also clean up more code in SearchPhaseController to make the separation between values that are the same on each search result and values that are per response. The reduced search phase result doesn't hold an arbitrary result to obtain values like `from`, `size` or sort values which is now cleanly encapsulated.
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java15
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java299
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java5
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java18
-rw-r--r--core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java9
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchService.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java2
-rw-r--r--core/src/main/java/org/elasticsearch/search/query/QueryPhase.java59
-rw-r--r--core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java60
-rw-r--r--core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java227
-rw-r--r--core/src/test/java/org/elasticsearch/search/SearchServiceTests.java19
11 files changed, 490 insertions, 227 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
index 920dd1b000..a0e313f1d7 100644
--- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
+++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
@@ -98,27 +98,26 @@ final class FetchSearchPhase extends SearchPhase {
final int numShards = context.getNumShards();
final boolean isScrollSearch = context.getRequest().scroll() != null;
List<SearchPhaseResult> phaseResults = queryResults.asList();
- ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults);
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final Runnable finishPhase = ()
- -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
+ -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
// query AND fetch optimization
finishPhase.run();
} else {
- final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
- if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
+ final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
+ if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
phaseResults.stream()
.map(e -> e.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.run();
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
- searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
+ searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
@@ -188,7 +187,7 @@ final class FetchSearchPhase extends SearchPhase {
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// we only release search context that we did not fetch from if we are not scrolling
// and if it has at lease one hit that didn't make it to the global topDocs
- if (context.getRequest().scroll() == null && queryResult.hasHits()) {
+ if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) {
try {
Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection);
@@ -198,11 +197,11 @@ final class FetchSearchPhase extends SearchPhase {
}
}
- private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
+ private void moveToNextPhase(SearchPhaseController searchPhaseController,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
- sortedDocs, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
+ reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
index 38d793b93e..13b4b2f73d 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
@@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
@@ -56,7 +57,6 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -147,42 +147,47 @@ public final class SearchPhaseController extends AbstractComponent {
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param results the search phase results to obtain the sort docs from
+ * @param bufferedTopDocs the pre-consumed buffered top docs
+ * @param topDocsStats the top docs stats to fill
+ * @param from the offset into the search results top docs
+ * @param size the number of hits to return from the merged top docs
*/
- public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results) throws IOException {
+ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
+ final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
if (results.isEmpty()) {
- return EMPTY_DOCS;
+ return SortedTopDocs.EMPTY;
}
- final Collection<TopDocs> topDocs = new ArrayList<>();
+ final Collection<TopDocs> topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
- int from = -1;
- int size = -1;
- for (SearchPhaseResult sortedResult : results) {
+ for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once
/* We loop over all results once, group together the completion suggestions if there are any and collect relevant
* top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road
* this allowed to remove a single shared optimization code here since now we don't materialized a dense array of
* top docs anymore but instead only pass relevant results / top docs to the merge method*/
QuerySearchResult queryResult = sortedResult.queryResult();
- if (queryResult.hasHits()) {
- from = queryResult.from();
- size = queryResult.size();
- TopDocs td = queryResult.topDocs();
- if (td != null && td.scoreDocs.length > 0) {
+ if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
+ final TopDocs td = queryResult.consumeTopDocs();
+ assert td != null;
+ topDocsStats.add(td);
+ if (td.scoreDocs.length > 0) { // make sure we set the shard index before we add it - the consumer didn't do that yet
setShardIndex(td, queryResult.getShardIndex());
topDocs.add(td);
}
+ }
+ if (queryResult.hasSuggestHits()) {
Suggest shardSuggest = queryResult.suggest();
- if (shardSuggest != null) {
- for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
- suggestion.setShardIndex(sortedResult.getShardIndex());
- List<Suggestion<CompletionSuggestion.Entry>> suggestions =
- groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
- suggestions.add(suggestion);
- }
+ for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
+ suggestion.setShardIndex(sortedResult.getShardIndex());
+ List<Suggestion<CompletionSuggestion.Entry>> suggestions =
+ groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+ suggestions.add(suggestion);
}
}
}
- if (size != -1) {
- final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
+ final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false;
+ if (hasHits) {
+ final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
+ final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs;
ScoreDoc[] scoreDocs = mergedScoreDocs;
if (groupedCompletionSuggestions.isEmpty() == false) {
int numSuggestDocs = 0;
@@ -204,23 +209,35 @@ public final class SearchPhaseController extends AbstractComponent {
}
}
}
- return scoreDocs;
+ final boolean isSortedByField;
+ final SortField[] sortFields;
+ if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
+ TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
+ isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
+ fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
+ sortFields = fieldDocs.fields;
+ } else {
+ isSortedByField = false;
+ sortFields = null;
+ }
+ return new SortedTopDocs(scoreDocs, isSortedByField, sortFields);
} else {
- // no relevant docs - just return an empty array
- return EMPTY_DOCS;
+ // no relevant docs
+ return SortedTopDocs.EMPTY;
}
}
- private ScoreDoc[] mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
+ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
if (results.isEmpty()) {
- return EMPTY_DOCS;
+ return null;
}
+ assert results.isEmpty() == false;
final boolean setShardIndex = false;
final TopDocs topDocs = results.stream().findFirst().get();
final TopDocs mergedTopDocs;
final int numShards = results.size();
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
- return topDocs.scoreDocs;
+ return topDocs;
} else if (topDocs instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs;
final Sort sort = new Sort(firstTopDocs.fields);
@@ -235,7 +252,7 @@ public final class SearchPhaseController extends AbstractComponent {
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex);
}
- return mergedTopDocs.scoreDocs;
+ return mergedTopDocs;
}
private static void setShardIndex(TopDocs topDocs, int shardIndex) {
@@ -249,12 +266,12 @@ public final class SearchPhaseController extends AbstractComponent {
}
}
- public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
- ScoreDoc[] sortedScoreDocs, int numShards) {
- ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
- if (reducedQueryPhase.isEmpty() == false) {
+ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) {
+ final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
+ if (reducedQueryPhase.isEmptyResult == false) {
+ final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs;
// from is always zero as when we use scroll, we ignore from
- long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size());
+ long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size);
// with collapsing we can have more hits than sorted docs
size = Math.min(sortedScoreDocs.length, size);
for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) {
@@ -288,13 +305,13 @@ public final class SearchPhaseController extends AbstractComponent {
* Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named
* completion suggestion ordered by suggestion name
*/
- public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
- ReducedQueryPhase reducedQueryPhase,
+ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reducedQueryPhase,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
- if (reducedQueryPhase.isEmpty()) {
+ if (reducedQueryPhase.isEmptyResult) {
return InternalSearchResponse.empty();
}
- SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup);
+ ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
+ SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup);
if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) {
int currentOffset = hits.getHits().length;
@@ -329,21 +346,15 @@ public final class SearchPhaseController extends AbstractComponent {
return reducedQueryPhase.buildResponse(hits);
}
- private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs,
+ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
- boolean sorted = false;
+ final boolean sorted = reducedQueryPhase.isSortedByField;
+ ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
int sortScoreIndex = -1;
- if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) {
- TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs();
- if (fieldDocs instanceof CollapseTopFieldDocs &&
- fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) {
- sorted = false;
- } else {
- sorted = true;
- for (int i = 0; i < fieldDocs.fields.length; i++) {
- if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) {
- sortScoreIndex = i;
- }
+ if (sorted) {
+ for (int i = 0; i < reducedQueryPhase.sortField.length; i++) {
+ if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) {
+ sortScoreIndex = i;
}
}
}
@@ -351,8 +362,8 @@ public final class SearchPhaseController extends AbstractComponent {
for (SearchPhaseResult entry : fetchResults) {
entry.fetchResult().initCounter();
}
- int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from();
- int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size());
+ int from = ignoreFrom ? 0 : reducedQueryPhase.from;
+ int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size);
// with collapsing we can have more fetch hits than sorted docs
numSearchHits = Math.min(sortedDocs.length, numSearchHits);
// merge hits
@@ -376,7 +387,7 @@ public final class SearchPhaseController extends AbstractComponent {
searchHit.shard(fetchResult.getSearchShardTarget());
if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
- searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats());
+ searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
@@ -393,42 +404,42 @@ public final class SearchPhaseController extends AbstractComponent {
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
- public ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) {
- return reducedQueryPhase(queryResults, null, 0);
+ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
+ return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
- * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
+ * @param bufferedAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
+ * from all non-null query results.
+ * @param bufferedTopDocs a list of pre-collected / buffered top docs. if this list is non-null all top docs have been consumed
* from all non-null query results.
* @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
- List<InternalAggregations> bufferdAggs, int numReducePhases) {
+ List<InternalAggregations> bufferedAggs, List<TopDocs> bufferedTopDocs,
+ TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
- long totalHits = 0;
- long fetchHits = 0;
- float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
- numReducePhases);
+ return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
+ timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs;
final List<InternalAggregations> aggregationsList;
- if (bufferdAggs != null) {
+ if (bufferedAggs != null) {
consumeAggs = false;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
- aggregationsList = bufferdAggs;
+ aggregationsList = bufferedAggs;
} else if (firstResult.hasAggs()) {
// the number of shards was less than the buffer size so we reduce agg results directly
aggregationsList = new ArrayList<>(queryResults.size());
@@ -443,8 +454,12 @@ public final class SearchPhaseController extends AbstractComponent {
final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap();
final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size())
: Collections.emptyMap();
+ int from = 0;
+ int size = 0;
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
+ from = result.from();
+ size = result.size();
if (result.searchTimedOut()) {
timedOut = true;
}
@@ -455,11 +470,6 @@ public final class SearchPhaseController extends AbstractComponent {
terminatedEarly = true;
}
}
- totalHits += result.topDocs().totalHits;
- fetchHits += result.topDocs().scoreDocs.length;
- if (!Float.isNaN(result.topDocs().getMaxScore())) {
- maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
- }
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
@@ -480,8 +490,11 @@ public final class SearchPhaseController extends AbstractComponent {
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
- shardResults, numReducePhases);
+ final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
+ return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
+ timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
+ firstResult != null ? firstResult.sortValueFormats() : null,
+ numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
}
@@ -522,8 +535,6 @@ public final class SearchPhaseController extends AbstractComponent {
final boolean timedOut;
// non null and true if at least one reduced result was terminated early
final Boolean terminatedEarly;
- // an non-null arbitrary query result if was at least one reduced result
- final QuerySearchResult oneResult;
// the reduced suggest results
final Suggest suggest;
// the reduced internal aggregations
@@ -532,10 +543,25 @@ public final class SearchPhaseController extends AbstractComponent {
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;
-
- ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
- QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
- SearchProfileShardResults shardResults, int numReducePhases) {
+ // the searches merged top docs
+ final ScoreDoc[] scoreDocs;
+ // the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
+ final SortField[] sortField;
+ // <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
+ final boolean isSortedByField;
+ // the size of the top hits to return
+ final int size;
+ // <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
+ final boolean isEmptyResult;
+ // the offset into the merged top hits
+ final int from;
+ // sort value formats used to sort / format the result
+ final DocValueFormat[] sortValueFormats;
+
+ ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
+ InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs,
+ SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size,
+ int from, boolean isEmptyResult) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
@@ -548,27 +574,26 @@ public final class SearchPhaseController extends AbstractComponent {
}
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
- this.oneResult = oneResult;
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
+ this.scoreDocs = scoreDocs;
+ this.sortField = sortFields;
+ this.isSortedByField = isSortedByField;
+ this.size = size;
+ this.from = from;
+ this.isEmptyResult = isEmptyResult;
+ this.sortValueFormats = sortValueFormats;
}
/**
* Creates a new search response from the given merged hits.
- * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction)
+ * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}
-
- /**
- * Returns <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
- */
- public boolean isEmpty() {
- return oneResult == null;
- }
}
/**
@@ -577,12 +602,16 @@ public final class SearchPhaseController extends AbstractComponent {
* This implementation can be configured to batch up a certain amount of results and only reduce them
* iff the buffer is exhausted.
*/
- static final class QueryPhaseResultConsumer
- extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
- private final InternalAggregations[] buffer;
+ static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
+ private final InternalAggregations[] aggsBuffer;
+ private final TopDocs[] topDocsBuffer;
+ private final boolean hasAggs;
+ private final boolean hasTopDocs;
+ private final int bufferSize;
private int index;
private final SearchPhaseController controller;
private int numReducePhases = 0;
+ private final TopDocsStats topDocsStats = new TopDocsStats();
/**
* Creates a new {@link QueryPhaseResultConsumer}
@@ -591,7 +620,8 @@ public final class SearchPhaseController extends AbstractComponent {
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
* the buffer is used to incrementally reduce aggregation results before all shards responded.
*/
- private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) {
+ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize,
+ boolean hasTopDocs, boolean hasAggs) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
@@ -599,39 +629,68 @@ public final class SearchPhaseController extends AbstractComponent {
if (expectedResultSize <= bufferSize) {
throw new IllegalArgumentException("buffer size must be less than the expected result size");
}
+ if (hasAggs == false && hasTopDocs == false) {
+ throw new IllegalArgumentException("either aggs or top docs must be present");
+ }
this.controller = controller;
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
- this.buffer = new InternalAggregations[bufferSize];
+ this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
+ this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
+ this.hasTopDocs = hasTopDocs;
+ this.hasAggs = hasAggs;
+ this.bufferSize = bufferSize;
+
}
@Override
public void consumeResult(SearchPhaseResult result) {
super.consumeResult(result);
QuerySearchResult queryResult = result.queryResult();
- assert queryResult.hasAggs() : "this collector should only be used if aggs are requested";
consumeInternal(queryResult);
}
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
- InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs();
- if (index == buffer.length) {
- InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
- Arrays.fill(buffer, null);
+ if (index == bufferSize) {
+ if (hasAggs) {
+ InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
+ Arrays.fill(aggsBuffer, null);
+ aggsBuffer[0] = reducedAggs;
+ }
+ if (hasTopDocs) {
+ TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
+ querySearchResult.from() + querySearchResult.size() // we have to merge here in the same way we collect on a shard
+ , 0);
+ Arrays.fill(topDocsBuffer, null);
+ topDocsBuffer[0] = reducedTopDocs;
+ }
numReducePhases++;
- buffer[0] = reducedAggs;
index = 1;
}
final int i = index++;
- buffer[i] = aggregations;
+ if (hasAggs) {
+ aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
+ }
+ if (hasTopDocs) {
+ final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null
+ topDocsStats.add(topDocs);
+ SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex());
+ topDocsBuffer[i] = topDocs;
+ }
+ }
+
+ private synchronized List<InternalAggregations> getRemainingAggs() {
+ return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null;
}
- private synchronized List<InternalAggregations> getRemaining() {
- return Arrays.asList(buffer).subList(0, index);
+ private synchronized List<TopDocs> getRemainingTopDocs() {
+ return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
}
+
@Override
public ReducedQueryPhase reduce() {
- return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
+ return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
+ numReducePhases, false);
}
/**
@@ -649,17 +708,49 @@ public final class SearchPhaseController extends AbstractComponent {
*/
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
- if (source != null && source.aggregations() != null) {
+ boolean isScrollRequest = request.scroll() != null;
+ final boolean hasAggs = source != null && source.aggregations() != null;
+ final boolean hasTopDocs = source == null || source.size() != 0;
+
+ if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
+ // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
- return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
+ return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
@Override
public ReducedQueryPhase reduce() {
- return reducedQueryPhase(results.asList());
+ return reducedQueryPhase(results.asList(), isScrollRequest);
}
};
}
+
+ static final class TopDocsStats {
+ long totalHits;
+ long fetchHits;
+ float maxScore = Float.NEGATIVE_INFINITY;
+
+ void add(TopDocs topDocs) {
+ totalHits += topDocs.totalHits;
+ fetchHits += topDocs.scoreDocs.length;
+ if (!Float.isNaN(topDocs.getMaxScore())) {
+ maxScore = Math.max(maxScore, topDocs.getMaxScore());
+ }
+ }
+ }
+
+ static final class SortedTopDocs {
+ static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null);
+ final ScoreDoc[] scoreDocs;
+ final boolean isSortedByField;
+ final SortField[] sortFields;
+
+ SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) {
+ this.scoreDocs = scoreDocs;
+ this.isSortedByField = isSortedByField;
+ this.sortFields = sortFields;
+ }
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
index c39a9fe6f2..b3ebaed3cb 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
@@ -173,9 +173,8 @@ final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private void innerFinishHim() throws Exception {
List<QueryFetchSearchResult> queryFetchSearchResults = queryFetchResults.asList();
- ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList());
- final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
- searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(true,
+ searchPhaseController.reducedQueryPhase(queryFetchSearchResults, true), queryFetchSearchResults, queryFetchResults::get);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
index 37071485a0..709738dcaf 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
@@ -55,7 +55,6 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
- private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
@@ -171,16 +170,15 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
private void executeFetchPhase() throws Exception {
- sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList());
- if (sortedShardDocs.length == 0) {
- finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList()));
+ final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(),
+ true);
+ if (reducedQueryPhase.scoreDocs.length == 0) {
+ finishHim(reducedQueryPhase);
return;
}
- final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
- SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList());
- final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs,
- queryResults.length());
+ final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), reducedQueryPhase.scoreDocs);
+ final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, queryResults.length());
final CountDown counter = new CountDown(docIdsToLoad.length);
for (int i = 0; i < docIdsToLoad.length; i++) {
final int index = i;
@@ -222,8 +220,8 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) {
try {
- final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase,
- fetchResults.asList(), fetchResults::get);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(),
+ fetchResults::get);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
index 2bf5e50a1c..fa82aa0ac6 100644
--- a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
+++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
@@ -31,14 +31,6 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* to get the concrete values as a list using {@link #asList()}.
*/
public class AtomicArray<E> {
-
- private static final AtomicArray EMPTY = new AtomicArray(0);
-
- @SuppressWarnings("unchecked")
- public static <E> E empty() {
- return (E) EMPTY;
- }
-
private final AtomicReferenceArray<E> array;
private volatile List<E> nonNullList;
@@ -53,7 +45,6 @@ public class AtomicArray<E> {
return array.length();
}
-
/**
* Sets the element at position {@code i} to the given value.
*
diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java
index a035228195..e601cec0fe 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchService.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchService.java
@@ -259,7 +259,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
loadOrExecuteQueryPhase(request, context);
- if (context.queryResult().hasHits() == false && context.scrollContext() == null) {
+ if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
@@ -341,7 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
- if (context.queryResult().hasHits() == false && context.scrollContext() == null) {
+ if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
index 83af0b9abd..97f2681252 100644
--- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
+++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
@@ -166,7 +166,7 @@ public class FetchPhase implements SearchPhase {
fetchSubPhase.hitsExecute(context, hits);
}
- context.fetchResult().hits(new SearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
+ context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
}
private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
index 13f32f74d0..272c57fe98 100644
--- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
+++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
@@ -142,7 +142,6 @@ public class QueryPhase implements SearchPhase {
queryResult.searchTimedOut(false);
final boolean doProfile = searchContext.getProfilers() != null;
- final SearchType searchType = searchContext.searchType();
boolean rescore = false;
try {
queryResult.from(searchContext.from());
@@ -165,12 +164,7 @@ public class QueryPhase implements SearchPhase {
if (searchContext.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList());
}
- topDocsCallable = new Callable<TopDocs>() {
- @Override
- public TopDocs call() throws Exception {
- return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
- }
- };
+ topDocsCallable = () -> new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
} else {
// Perhaps have a dedicated scroll phase?
final ScrollContext scrollContext = searchContext.scrollContext();
@@ -238,38 +232,35 @@ public class QueryPhase implements SearchPhase {
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList());
}
- topDocsCallable = new Callable<TopDocs>() {
- @Override
- public TopDocs call() throws Exception {
- final TopDocs topDocs;
- if (topDocsCollector instanceof TopDocsCollector) {
- topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
- } else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
- topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
+ topDocsCallable = () -> {
+ final TopDocs topDocs;
+ if (topDocsCollector instanceof TopDocsCollector) {
+ topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
+ } else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
+ topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
+ } else {
+ throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
+ }
+ if (scrollContext != null) {
+ if (scrollContext.totalHits == -1) {
+ // first round
+ scrollContext.totalHits = topDocs.totalHits;
+ scrollContext.maxScore = topDocs.getMaxScore();
} else {
- throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
+ // subsequent round: the total number of hits and
+ // the maximum score were computed on the first round
+ topDocs.totalHits = scrollContext.totalHits;
+ topDocs.setMaxScore(scrollContext.maxScore);
}
- if (scrollContext != null) {
- if (scrollContext.totalHits == -1) {
- // first round
- scrollContext.totalHits = topDocs.totalHits;
- scrollContext.maxScore = topDocs.getMaxScore();
- } else {
- // subsequent round: the total number of hits and
- // the maximum score were computed on the first round
- topDocs.totalHits = scrollContext.totalHits;
- topDocs.setMaxScore(scrollContext.maxScore);
- }
- if (searchContext.request().numberOfShards() == 1) {
- // if we fetch the document in the same roundtrip, we already know the last emitted doc
- if (topDocs.scoreDocs.length > 0) {
- // set the last emitted doc
- scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
- }
+ if (searchContext.request().numberOfShards() == 1) {
+ // if we fetch the document in the same roundtrip, we already know the last emitted doc
+ if (topDocs.scoreDocs.length > 0) {
+ // set the last emitted doc
+ scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
}
}
- return topDocs;
}
+ return topDocs;
};
}
diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
index 15403f9967..f071c62f12 100644
--- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
+++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
@@ -55,6 +55,9 @@ public final class QuerySearchResult extends SearchPhaseResult {
private Boolean terminatedEarly = null;
private ProfileShardResult profileShardResults;
private boolean hasProfileResults;
+ private boolean hasScoreDocs;
+ private int totalHits;
+ private float maxScore;
public QuerySearchResult() {
}
@@ -87,11 +90,34 @@ public final class QuerySearchResult extends SearchPhaseResult {
}
public TopDocs topDocs() {
+ if (topDocs == null) {
+ throw new IllegalStateException("topDocs already consumed");
+ }
+ return topDocs;
+ }
+
+ /**
+ * Returns <code>true</code> iff the top docs have already been consumed.
+ */
+ public boolean hasConsumedTopDocs() {
+ return topDocs == null;
+ }
+
+ /**
+ * Returns and nulls out the top docs for this search results. This allows to free up memory once the top docs are consumed.
+ * @throws IllegalStateException if the top docs have already been consumed.
+ */
+ public TopDocs consumeTopDocs() {
+ TopDocs topDocs = this.topDocs;
+ if (topDocs == null) {
+ throw new IllegalStateException("topDocs already consumed");
+ }
+ this.topDocs = null;
return topDocs;
}
public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) {
- this.topDocs = topDocs;
+ setTopDocs(topDocs);
if (topDocs.scoreDocs.length > 0 && topDocs.scoreDocs[0] instanceof FieldDoc) {
int numFields = ((FieldDoc) topDocs.scoreDocs[0]).fields.length;
if (numFields != sortValueFormats.length) {
@@ -102,12 +128,19 @@ public final class QuerySearchResult extends SearchPhaseResult {
this.sortValueFormats = sortValueFormats;
}
+ private void setTopDocs(TopDocs topDocs) {
+ this.topDocs = topDocs;
+ hasScoreDocs = topDocs.scoreDocs.length > 0;
+ this.totalHits = topDocs.totalHits;
+ this.maxScore = topDocs.getMaxScore();
+ }
+
public DocValueFormat[] sortValueFormats() {
return sortValueFormats;
}
/**
- * Retruns <code>true</code> if this query result has unconsumed aggregations
+ * Returns <code>true</code> if this query result has unconsumed aggregations
*/
public boolean hasAggs() {
return hasAggs;
@@ -195,10 +228,15 @@ public final class QuerySearchResult extends SearchPhaseResult {
return this;
}
- /** Returns true iff the result has hits */
- public boolean hasHits() {
- return (topDocs != null && topDocs.scoreDocs.length > 0) ||
- (suggest != null && suggest.hasScoreDocs());
+ /**
+ * Returns <code>true</code> if this result has any suggest score docs
+ */
+ public boolean hasSuggestHits() {
+ return (suggest != null && suggest.hasScoreDocs());
+ }
+
+ public boolean hasSearchContext() {
+ return hasScoreDocs || hasSuggestHits();
}
public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException {
@@ -227,7 +265,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
sortValueFormats[i] = in.readNamedWriteable(DocValueFormat.class);
}
}
- topDocs = readTopDocs(in);
+ setTopDocs(readTopDocs(in));
if (hasAggs = in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in);
}
@@ -278,4 +316,12 @@ public final class QuerySearchResult extends SearchPhaseResult {
out.writeOptionalBoolean(terminatedEarly);
out.writeOptionalWriteable(profileShardResults);
}
+
+ public int getTotalHits() {
+ return totalHits;
+ }
+
+ public float getMaxScore() {
+ return maxScore;
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
index 207183bae4..c92caef628 100644
--- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.lucene.Lucene;
@@ -42,6 +43,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.TestCluster;
import org.junit.Before;
import java.io.IOException;
@@ -51,12 +53,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
public class SearchPhaseControllerTests extends ESTestCase {
@@ -75,8 +81,16 @@ public class SearchPhaseControllerTests extends ESTestCase {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
- ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList());
+ Optional<SearchPhaseResult> first = results.asList().stream().findFirst();
+ int from = 0, size = 0;
+ if (first.isPresent()) {
+ from = first.get().queryResult().from();
+ size = first.get().queryResult().size();
+ }
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
+ from, size)
+ .scoreDocs;
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
accumulatedLength += suggestionSize;
@@ -84,48 +98,71 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertThat(sortedDocs.length, equalTo(accumulatedLength));
}
- public void testSortIsIdempotent() throws IOException {
+ public void testSortIsIdempotent() throws Exception {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
- AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
- randomBoolean() || true);
+ long randomSeed = randomLong();
+ boolean useConstantScore = randomBoolean();
+ AtomicArray<SearchPhaseResult> results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
+ useConstantScore);
boolean ignoreFrom = randomBoolean();
- ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList());
+ Optional<SearchPhaseResult> first = results.asList().stream().findFirst();
+ int from = 0, size = 0;
+ if (first.isPresent()) {
+ from = first.get().queryResult().from();
+ size = first.get().queryResult().size();
+ }
+ SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
+
+ results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
+ useConstantScore);
+ SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
+ ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
+ assertEquals(sortedDocs.length, sortedDocs2.length);
+ for (int i = 0; i < sortedDocs.length; i++) {
+ assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);
+ assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex);
+ assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f);
+ }
+ assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f);
+ assertEquals(topDocsStats.totalHits, topDocsStats2.totalHits);
+ assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits);
+ }
- ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList());
- assertArrayEquals(sortedDocs, sortedDocs2);
+ private AtomicArray<SearchPhaseResult> generateSeededQueryResults(long seed, int nShards,
+ List<CompletionSuggestion> suggestions,
+ int searchHitsSize, boolean useConstantScore) throws Exception {
+ return RandomizedContext.current().runWithPrivateRandomness(seed,
+ () -> generateQueryResults(nShards, suggestions, searchHitsSize, useConstantScore));
}
public void testMerge() throws IOException {
List<CompletionSuggestion> suggestions = new ArrayList<>();
+ int maxSuggestSize = 0;
for (int i = 0; i < randomIntBetween(1, 5); i++) {
- suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+ int size = randomIntBetween(1, 20);
+ maxSuggestSize += size;
+ suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), size));
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
-
- // calculate offsets and score doc array
- List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
- ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults);
- mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs));
- Suggest mergedSuggest = reducedSuggest(queryResults);
- for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
- if (suggestion instanceof CompletionSuggestion) {
- CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion);
- mergedScoreDocs.addAll(completionSuggestion.getOptions().stream()
- .map(CompletionSuggestion.Entry.Option::getDoc)
- .collect(Collectors.toList()));
- }
- }
- ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
- AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest);
- InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs,
- searchPhaseController.reducedQueryPhase(queryResults.asList()),
+ SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false);
+ AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs,
+ reducedQueryPhase.suggest);
+ InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
+ reducedQueryPhase,
searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
- assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
+ int suggestSize = 0;
+ for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
+ Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
+ suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size()));
+ }
+ assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize));
+ assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize));
Suggest suggestResult = mergedResponse.suggest();
- for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+ for (Suggest.Suggestion<?> suggestion : reducedQueryPhase.suggest) {
assertThat(suggestion, instanceOf(CompletionSuggestion.class));
if (suggestion.getEntries().get(0).getOptions().size() > 0) {
CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
@@ -209,16 +246,6 @@ public class SearchPhaseControllerTests extends ESTestCase {
.collect(Collectors.toList()));
}
- private ScoreDoc[] getTopShardDocs(AtomicArray<SearchPhaseResult> results) throws IOException {
- List<SearchPhaseResult> resultList = results.asList();
- TopDocs[] shardTopDocs = new TopDocs[resultList.size()];
- for (int i = 0; i < resultList.size(); i++) {
- shardTopDocs[i] = resultList.get(i).queryResult().topDocs();
- }
- int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results));
- return TopDocs.merge(topN, shardTopDocs).scoreDocs;
- }
-
private AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
@@ -309,30 +336,96 @@ public class SearchPhaseControllerTests extends ESTestCase {
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
- CountDownLatch latch = new CountDownLatch(expectedNumResults);
+ Thread[] threads = new Thread[expectedNumResults];
for (int i = 0; i < expectedNumResults; i++) {
int id = i;
- Thread t = new Thread(() -> {
+ threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
- result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
+ result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(id);
+ result.size(1);
consumer.consumeResult(result);
- latch.countDown();
});
- t.start();
+ threads[i].start();
+ }
+ for (int i = 0; i < expectedNumResults; i++) {
+ threads[i].join();
}
- latch.await();
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(max.get(), internalMax.getValue(), 0.0D);
+ assertEquals(1, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f);
}
+ public void testConsumerOnlyAggs() throws InterruptedException {
+ int expectedNumResults = randomIntBetween(1, 100);
+ int bufferSize = randomIntBetween(2, 200);
+ SearchRequest request = new SearchRequest();
+ request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
+ request.setBatchedReduceSize(bufferSize);
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
+ AtomicInteger max = new AtomicInteger();
+ for (int i = 0; i < expectedNumResults; i++) {
+ int id = i;
+ int number = randomIntBetween(1, 1000);
+ max.updateAndGet(prev -> Math.max(prev, number));
+ QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
+ result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]);
+ InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
+ DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
+ result.aggregations(aggs);
+ result.setShardIndex(id);
+ result.size(1);
+ consumer.consumeResult(result);
+ }
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
+ assertEquals(max.get(), internalMax.getValue(), 0.0D);
+ assertEquals(0, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ }
+
+
+ public void testConsumerOnlyHits() throws InterruptedException {
+ int expectedNumResults = randomIntBetween(1, 100);
+ int bufferSize = randomIntBetween(2, 200);
+ SearchRequest request = new SearchRequest();
+ if (randomBoolean()) {
+ request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
+ }
+ request.setBatchedReduceSize(bufferSize);
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
+ AtomicInteger max = new AtomicInteger();
+ for (int i = 0; i < expectedNumResults; i++) {
+ int id = i;
+ int number = randomIntBetween(1, 1000);
+ max.updateAndGet(prev -> Math.max(prev, number));
+ QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
+ result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
+ result.setShardIndex(id);
+ result.size(1);
+ consumer.consumeResult(result);
+ }
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(1, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f);
+ }
+
+
public void testNewSearchPhaseResults() {
for (int i = 0; i < 10; i++) {
int expectedNumResults = randomIntBetween(1, 10);
@@ -342,10 +435,22 @@ public class SearchPhaseControllerTests extends ESTestCase {
if ((hasAggs = randomBoolean())) {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
}
+ final boolean hasTopDocs;
+ if ((hasTopDocs = randomBoolean())) {
+ if (request.source() != null) {
+ request.source().size(randomIntBetween(1, 100));
+ } // no source means size = 10
+ } else {
+ if (request.source() == null) {
+ request.source(new SearchSourceBuilder().size(0));
+ } else {
+ request.source().size(0);
+ }
+ }
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
- if (hasAggs && expectedNumResults > bufferSize) {
+ if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) {
assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize,
consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
} else {
@@ -354,4 +459,36 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
}
}
+
+ public void testReduceTopNWithFromOffset() {
+ SearchRequest request = new SearchRequest();
+ request.source(new SearchSourceBuilder().size(5).from(5));
+ request.setBatchedReduceSize(randomIntBetween(2, 4));
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, 4);
+ int score = 100;
+ for (int i = 0; i < 4; i++) {
+ QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i));
+ ScoreDoc[] docs = new ScoreDoc[3];
+ for (int j = 0; j < docs.length; j++) {
+ docs[j] = new ScoreDoc(0, score--);
+ }
+ result.topDocs(new TopDocs(3, docs, docs[0].score), new DocValueFormat[0]);
+ result.setShardIndex(i);
+ result.size(5);
+ result.from(5);
+ consumer.consumeResult(result);
+ }
+ // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5
+
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(5, reduce.scoreDocs.length);
+ assertEquals(100.f, reduce.maxScore, 0.0f);
+ assertEquals(12, reduce.totalHits);
+ assertEquals(95.0f, reduce.scoreDocs[0].score, 0.0f);
+ assertEquals(94.0f, reduce.scoreDocs[1].score, 0.0f);
+ assertEquals(93.0f, reduce.scoreDocs[2].score, 0.0f);
+ assertEquals(92.0f, reduce.scoreDocs[3].score, 0.0f);
+ assertEquals(91.0f, reduce.scoreDocs[4].score, 0.0f);
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java
index f3ff6be1cc..6fc795a882 100644
--- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java
@@ -223,8 +223,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
- // the search context should inherit the default timeout
- assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
+ try {
+ // the search context should inherit the default timeout
+ assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
+ } finally {
+ contextWithDefaultTimeout.decRef();
+ service.freeContext(contextWithDefaultTimeout.id());
+ }
final long seconds = randomIntBetween(6, 10);
final SearchContext context = service.createContext(
@@ -238,8 +243,14 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
- // the search context should inherit the query timeout
- assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds)));
+ try {
+ // the search context should inherit the query timeout
+ assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds)));
+ } finally {
+ context.decRef();
+ service.freeContext(context.id());
+ }
+
}
public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {