diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java | 299 |
1 files changed, 195 insertions, 104 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 38d793b93e..13b4b2f73d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; @@ -56,7 +57,6 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -147,42 +147,47 @@ public final class SearchPhaseController extends AbstractComponent { * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * @param results the search phase results to obtain the sort docs from + * @param bufferedTopDocs the pre-consumed buffered top docs + * @param topDocsStats the top docs stats to fill + * @param from the offset into the search results top docs + * @param size the number of hits to return from the merged top docs */ - public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results) throws IOException { + public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results, + final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) { if (results.isEmpty()) { - return EMPTY_DOCS; + return SortedTopDocs.EMPTY; } - final Collection<TopDocs> topDocs = new ArrayList<>(); + final Collection<TopDocs> topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs; final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>(); - int from = -1; - int size = -1; - for (SearchPhaseResult sortedResult : results) { + for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once /* We loop over all results once, group together the completion suggestions if there are any and collect relevant * top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road * this allowed to remove a single shared optimization code here since now we don't materialized a dense array of * top docs anymore but instead only pass relevant results / top docs to the merge method*/ QuerySearchResult queryResult = sortedResult.queryResult(); - if (queryResult.hasHits()) { - from = queryResult.from(); - size = queryResult.size(); - TopDocs td = queryResult.topDocs(); - if (td != null && td.scoreDocs.length > 0) { + if (queryResult.hasConsumedTopDocs() == false) { // already consumed? + final TopDocs td = queryResult.consumeTopDocs(); + assert td != null; + topDocsStats.add(td); + if (td.scoreDocs.length > 0) { // make sure we set the shard index before we add it - the consumer didn't do that yet setShardIndex(td, queryResult.getShardIndex()); topDocs.add(td); } + } + if (queryResult.hasSuggestHits()) { Suggest shardSuggest = queryResult.suggest(); - if (shardSuggest != null) { - for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { - suggestion.setShardIndex(sortedResult.getShardIndex()); - List<Suggestion<CompletionSuggestion.Entry>> suggestions = - groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); - suggestions.add(suggestion); - } + for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { + suggestion.setShardIndex(sortedResult.getShardIndex()); + List<Suggestion<CompletionSuggestion.Entry>> suggestions = + groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestions.add(suggestion); } } } - if (size != -1) { - final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); + final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false; + if (hasHits) { + final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); + final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs; ScoreDoc[] scoreDocs = mergedScoreDocs; if (groupedCompletionSuggestions.isEmpty() == false) { int numSuggestDocs = 0; @@ -204,23 +209,35 @@ public final class SearchPhaseController extends AbstractComponent { } } } - return scoreDocs; + final boolean isSortedByField; + final SortField[] sortFields; + if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) { + TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs; + isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs && + fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false; + sortFields = fieldDocs.fields; + } else { + isSortedByField = false; + sortFields = null; + } + return new SortedTopDocs(scoreDocs, isSortedByField, sortFields); } else { - // no relevant docs - just return an empty array - return EMPTY_DOCS; + // no relevant docs + return SortedTopDocs.EMPTY; } } - private ScoreDoc[] mergeTopDocs(Collection<TopDocs> results, int topN, int from) { + TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) { if (results.isEmpty()) { - return EMPTY_DOCS; + return null; } + assert results.isEmpty() == false; final boolean setShardIndex = false; final TopDocs topDocs = results.stream().findFirst().get(); final TopDocs mergedTopDocs; final int numShards = results.size(); if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them. - return topDocs.scoreDocs; + return topDocs; } else if (topDocs instanceof CollapseTopFieldDocs) { CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs; final Sort sort = new Sort(firstTopDocs.fields); @@ -235,7 +252,7 @@ public final class SearchPhaseController extends AbstractComponent { final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]); mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex); } - return mergedTopDocs.scoreDocs; + return mergedTopDocs; } private static void setShardIndex(TopDocs topDocs, int shardIndex) { @@ -249,12 +266,12 @@ public final class SearchPhaseController extends AbstractComponent { } } - public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, - ScoreDoc[] sortedScoreDocs, int numShards) { - ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; - if (reducedQueryPhase.isEmpty() == false) { + public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) { + final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; + if (reducedQueryPhase.isEmptyResult == false) { + final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs; // from is always zero as when we use scroll, we ignore from - long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size()); + long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size); // with collapsing we can have more hits than sorted docs size = Math.min(sortedScoreDocs.length, size); for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { @@ -288,13 +305,13 @@ public final class SearchPhaseController extends AbstractComponent { * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named * completion suggestion ordered by suggestion name */ - public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, - ReducedQueryPhase reducedQueryPhase, + public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reducedQueryPhase, Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) { - if (reducedQueryPhase.isEmpty()) { + if (reducedQueryPhase.isEmptyResult) { return InternalSearchResponse.empty(); } - SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup); + ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; + SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup); if (reducedQueryPhase.suggest != null) { if (!fetchResults.isEmpty()) { int currentOffset = hits.getHits().length; @@ -329,21 +346,15 @@ public final class SearchPhaseController extends AbstractComponent { return reducedQueryPhase.buildResponse(hits); } - private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, + private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) { - boolean sorted = false; + final boolean sorted = reducedQueryPhase.isSortedByField; + ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; int sortScoreIndex = -1; - if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { - TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs(); - if (fieldDocs instanceof CollapseTopFieldDocs && - fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) { - sorted = false; - } else { - sorted = true; - for (int i = 0; i < fieldDocs.fields.length; i++) { - if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) { - sortScoreIndex = i; - } + if (sorted) { + for (int i = 0; i < reducedQueryPhase.sortField.length; i++) { + if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) { + sortScoreIndex = i; } } } @@ -351,8 +362,8 @@ public final class SearchPhaseController extends AbstractComponent { for (SearchPhaseResult entry : fetchResults) { entry.fetchResult().initCounter(); } - int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from(); - int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size()); + int from = ignoreFrom ? 0 : reducedQueryPhase.from; + int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size); // with collapsing we can have more fetch hits than sorted docs numSearchHits = Math.min(sortedDocs.length, numSearchHits); // merge hits @@ -376,7 +387,7 @@ public final class SearchPhaseController extends AbstractComponent { searchHit.shard(fetchResult.getSearchShardTarget()); if (sorted) { FieldDoc fieldDoc = (FieldDoc) shardDoc; - searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats()); + searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats); if (sortScoreIndex != -1) { searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); } @@ -393,42 +404,42 @@ public final class SearchPhaseController extends AbstractComponent { * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) { - return reducedQueryPhase(queryResults, null, 0); + public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) { + return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest); } /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results - * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * @param bufferedAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * from all non-null query results. + * @param bufferedTopDocs a list of pre-collected / buffered top docs. if this list is non-null all top docs have been consumed * from all non-null query results. * @param numReducePhases the number of non-final reduce phases applied to the query results. * @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeProfileResult() */ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, - List<InternalAggregations> bufferdAggs, int numReducePhases) { + List<InternalAggregations> bufferedAggs, List<TopDocs> bufferedTopDocs, + TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase - long totalHits = 0; - long fetchHits = 0; - float maxScore = Float.NEGATIVE_INFINITY; boolean timedOut = false; Boolean terminatedEarly = null; if (queryResults.isEmpty()) { // early terminate we have nothing to reduce - return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null, - numReducePhases); + return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, + timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true); } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean consumeAggs; final List<InternalAggregations> aggregationsList; - if (bufferdAggs != null) { + if (bufferedAggs != null) { consumeAggs = false; // we already have results from intermediate reduces and just need to perform the final reduce assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?"; - aggregationsList = bufferdAggs; + aggregationsList = bufferedAggs; } else if (firstResult.hasAggs()) { // the number of shards was less than the buffer size so we reduce agg results directly aggregationsList = new ArrayList<>(queryResults.size()); @@ -443,8 +454,12 @@ public final class SearchPhaseController extends AbstractComponent { final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) : Collections.emptyMap(); + int from = 0; + int size = 0; for (SearchPhaseResult entry : queryResults) { QuerySearchResult result = entry.queryResult(); + from = result.from(); + size = result.size(); if (result.searchTimedOut()) { timedOut = true; } @@ -455,11 +470,6 @@ public final class SearchPhaseController extends AbstractComponent { terminatedEarly = true; } } - totalHits += result.topDocs().totalHits; - fetchHits += result.topDocs().scoreDocs.length; - if (!Float.isNaN(result.topDocs().getMaxScore())) { - maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); - } if (hasSuggest) { assert result.suggest() != null; for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) { @@ -480,8 +490,11 @@ public final class SearchPhaseController extends AbstractComponent { final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, firstResult.pipelineAggregators(), reduceContext); final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); - return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations, - shardResults, numReducePhases); + final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size); + return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, + timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields, + firstResult != null ? firstResult.sortValueFormats() : null, + numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null); } @@ -522,8 +535,6 @@ public final class SearchPhaseController extends AbstractComponent { final boolean timedOut; // non null and true if at least one reduced result was terminated early final Boolean terminatedEarly; - // an non-null arbitrary query result if was at least one reduced result - final QuerySearchResult oneResult; // the reduced suggest results final Suggest suggest; // the reduced internal aggregations @@ -532,10 +543,25 @@ public final class SearchPhaseController extends AbstractComponent { final SearchProfileShardResults shardResults; // the number of reduces phases final int numReducePhases; - - ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, - QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations, - SearchProfileShardResults shardResults, int numReducePhases) { + // the searches merged top docs + final ScoreDoc[] scoreDocs; + // the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted + final SortField[] sortField; + // <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set. + final boolean isSortedByField; + // the size of the top hits to return + final int size; + // <code>true</code> iff the query phase had no results. Otherwise <code>false</code> + final boolean isEmptyResult; + // the offset into the merged top hits + final int from; + // sort value formats used to sort / format the result + final DocValueFormat[] sortValueFormats; + + ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest, + InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs, + SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size, + int from, boolean isEmptyResult) { if (numReducePhases <= 0) { throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases); } @@ -548,27 +574,26 @@ public final class SearchPhaseController extends AbstractComponent { } this.timedOut = timedOut; this.terminatedEarly = terminatedEarly; - this.oneResult = oneResult; this.suggest = suggest; this.aggregations = aggregations; this.shardResults = shardResults; this.numReducePhases = numReducePhases; + this.scoreDocs = scoreDocs; + this.sortField = sortFields; + this.isSortedByField = isSortedByField; + this.size = size; + this.from = from; + this.isEmptyResult = isEmptyResult; + this.sortValueFormats = sortValueFormats; } /** * Creates a new search response from the given merged hits. - * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction) + * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction) */ public InternalSearchResponse buildResponse(SearchHits hits) { return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases); } - - /** - * Returns <code>true</code> iff the query phase had no results. Otherwise <code>false</code> - */ - public boolean isEmpty() { - return oneResult == null; - } } /** @@ -577,12 +602,16 @@ public final class SearchPhaseController extends AbstractComponent { * This implementation can be configured to batch up a certain amount of results and only reduce them * iff the buffer is exhausted. */ - static final class QueryPhaseResultConsumer - extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> { - private final InternalAggregations[] buffer; + static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> { + private final InternalAggregations[] aggsBuffer; + private final TopDocs[] topDocsBuffer; + private final boolean hasAggs; + private final boolean hasTopDocs; + private final int bufferSize; private int index; private final SearchPhaseController controller; private int numReducePhases = 0; + private final TopDocsStats topDocsStats = new TopDocsStats(); /** * Creates a new {@link QueryPhaseResultConsumer} @@ -591,7 +620,8 @@ public final class SearchPhaseController extends AbstractComponent { * @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results * the buffer is used to incrementally reduce aggregation results before all shards responded. */ - private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) { + private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize, + boolean hasTopDocs, boolean hasAggs) { super(expectedResultSize); if (expectedResultSize != 1 && bufferSize < 2) { throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result"); @@ -599,39 +629,68 @@ public final class SearchPhaseController extends AbstractComponent { if (expectedResultSize <= bufferSize) { throw new IllegalArgumentException("buffer size must be less than the expected result size"); } + if (hasAggs == false && hasTopDocs == false) { + throw new IllegalArgumentException("either aggs or top docs must be present"); + } this.controller = controller; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. - this.buffer = new InternalAggregations[bufferSize]; + this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; + this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; + this.hasTopDocs = hasTopDocs; + this.hasAggs = hasAggs; + this.bufferSize = bufferSize; + } @Override public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); - assert queryResult.hasAggs() : "this collector should only be used if aggs are requested"; consumeInternal(queryResult); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { - InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs(); - if (index == buffer.length) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer)); - Arrays.fill(buffer, null); + if (index == bufferSize) { + if (hasAggs) { + InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + Arrays.fill(aggsBuffer, null); + aggsBuffer[0] = reducedAggs; + } + if (hasTopDocs) { + TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer), + querySearchResult.from() + querySearchResult.size() // we have to merge here in the same way we collect on a shard + , 0); + Arrays.fill(topDocsBuffer, null); + topDocsBuffer[0] = reducedTopDocs; + } numReducePhases++; - buffer[0] = reducedAggs; index = 1; } final int i = index++; - buffer[i] = aggregations; + if (hasAggs) { + aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); + } + if (hasTopDocs) { + final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null + topDocsStats.add(topDocs); + SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex()); + topDocsBuffer[i] = topDocs; + } + } + + private synchronized List<InternalAggregations> getRemainingAggs() { + return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null; } - private synchronized List<InternalAggregations> getRemaining() { - return Arrays.asList(buffer).subList(0, index); + private synchronized List<TopDocs> getRemainingTopDocs() { + return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null; } + @Override public ReducedQueryPhase reduce() { - return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases); + return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, + numReducePhases, false); } /** @@ -649,17 +708,49 @@ public final class SearchPhaseController extends AbstractComponent { */ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) { SearchSourceBuilder source = request.source(); - if (source != null && source.aggregations() != null) { + boolean isScrollRequest = request.scroll() != null; + final boolean hasAggs = source != null && source.aggregations() != null; + final boolean hasTopDocs = source == null || source.size() != 0; + + if (isScrollRequest == false && (hasAggs || hasTopDocs)) { + // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize()); + return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs); } } return new InitialSearchPhase.SearchPhaseResults(numShards) { @Override public ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList()); + return reducedQueryPhase(results.asList(), isScrollRequest); } }; } + + static final class TopDocsStats { + long totalHits; + long fetchHits; + float maxScore = Float.NEGATIVE_INFINITY; + + void add(TopDocs topDocs) { + totalHits += topDocs.totalHits; + fetchHits += topDocs.scoreDocs.length; + if (!Float.isNaN(topDocs.getMaxScore())) { + maxScore = Math.max(maxScore, topDocs.getMaxScore()); + } + } + } + + static final class SortedTopDocs { + static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null); + final ScoreDoc[] scoreDocs; + final boolean isSortedByField; + final SortField[] sortFields; + + SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) { + this.scoreDocs = scoreDocs; + this.isSortedByField = isSortedByField; + this.sortFields = sortFields; + } + } } |