summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java299
1 files changed, 195 insertions, 104 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
index 38d793b93e..13b4b2f73d 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
@@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
@@ -56,7 +57,6 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -147,42 +147,47 @@ public final class SearchPhaseController extends AbstractComponent {
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param results the search phase results to obtain the sort docs from
+ * @param bufferedTopDocs the pre-consumed buffered top docs
+ * @param topDocsStats the top docs stats to fill
+ * @param from the offset into the search results top docs
+ * @param size the number of hits to return from the merged top docs
*/
- public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results) throws IOException {
+ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
+ final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
if (results.isEmpty()) {
- return EMPTY_DOCS;
+ return SortedTopDocs.EMPTY;
}
- final Collection<TopDocs> topDocs = new ArrayList<>();
+ final Collection<TopDocs> topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
- int from = -1;
- int size = -1;
- for (SearchPhaseResult sortedResult : results) {
+ for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once
/* We loop over all results once, group together the completion suggestions if there are any and collect relevant
* top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road
* this allowed to remove a single shared optimization code here since now we don't materialized a dense array of
* top docs anymore but instead only pass relevant results / top docs to the merge method*/
QuerySearchResult queryResult = sortedResult.queryResult();
- if (queryResult.hasHits()) {
- from = queryResult.from();
- size = queryResult.size();
- TopDocs td = queryResult.topDocs();
- if (td != null && td.scoreDocs.length > 0) {
+ if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
+ final TopDocs td = queryResult.consumeTopDocs();
+ assert td != null;
+ topDocsStats.add(td);
+ if (td.scoreDocs.length > 0) { // make sure we set the shard index before we add it - the consumer didn't do that yet
setShardIndex(td, queryResult.getShardIndex());
topDocs.add(td);
}
+ }
+ if (queryResult.hasSuggestHits()) {
Suggest shardSuggest = queryResult.suggest();
- if (shardSuggest != null) {
- for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
- suggestion.setShardIndex(sortedResult.getShardIndex());
- List<Suggestion<CompletionSuggestion.Entry>> suggestions =
- groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
- suggestions.add(suggestion);
- }
+ for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
+ suggestion.setShardIndex(sortedResult.getShardIndex());
+ List<Suggestion<CompletionSuggestion.Entry>> suggestions =
+ groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+ suggestions.add(suggestion);
}
}
}
- if (size != -1) {
- final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
+ final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false;
+ if (hasHits) {
+ final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
+ final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs;
ScoreDoc[] scoreDocs = mergedScoreDocs;
if (groupedCompletionSuggestions.isEmpty() == false) {
int numSuggestDocs = 0;
@@ -204,23 +209,35 @@ public final class SearchPhaseController extends AbstractComponent {
}
}
}
- return scoreDocs;
+ final boolean isSortedByField;
+ final SortField[] sortFields;
+ if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
+ TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
+ isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
+ fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
+ sortFields = fieldDocs.fields;
+ } else {
+ isSortedByField = false;
+ sortFields = null;
+ }
+ return new SortedTopDocs(scoreDocs, isSortedByField, sortFields);
} else {
- // no relevant docs - just return an empty array
- return EMPTY_DOCS;
+ // no relevant docs
+ return SortedTopDocs.EMPTY;
}
}
- private ScoreDoc[] mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
+ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
if (results.isEmpty()) {
- return EMPTY_DOCS;
+ return null;
}
+ assert results.isEmpty() == false;
final boolean setShardIndex = false;
final TopDocs topDocs = results.stream().findFirst().get();
final TopDocs mergedTopDocs;
final int numShards = results.size();
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
- return topDocs.scoreDocs;
+ return topDocs;
} else if (topDocs instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs;
final Sort sort = new Sort(firstTopDocs.fields);
@@ -235,7 +252,7 @@ public final class SearchPhaseController extends AbstractComponent {
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex);
}
- return mergedTopDocs.scoreDocs;
+ return mergedTopDocs;
}
private static void setShardIndex(TopDocs topDocs, int shardIndex) {
@@ -249,12 +266,12 @@ public final class SearchPhaseController extends AbstractComponent {
}
}
- public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
- ScoreDoc[] sortedScoreDocs, int numShards) {
- ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
- if (reducedQueryPhase.isEmpty() == false) {
+ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) {
+ final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
+ if (reducedQueryPhase.isEmptyResult == false) {
+ final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs;
// from is always zero as when we use scroll, we ignore from
- long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size());
+ long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size);
// with collapsing we can have more hits than sorted docs
size = Math.min(sortedScoreDocs.length, size);
for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) {
@@ -288,13 +305,13 @@ public final class SearchPhaseController extends AbstractComponent {
* Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named
* completion suggestion ordered by suggestion name
*/
- public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
- ReducedQueryPhase reducedQueryPhase,
+ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reducedQueryPhase,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
- if (reducedQueryPhase.isEmpty()) {
+ if (reducedQueryPhase.isEmptyResult) {
return InternalSearchResponse.empty();
}
- SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup);
+ ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
+ SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup);
if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) {
int currentOffset = hits.getHits().length;
@@ -329,21 +346,15 @@ public final class SearchPhaseController extends AbstractComponent {
return reducedQueryPhase.buildResponse(hits);
}
- private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs,
+ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
- boolean sorted = false;
+ final boolean sorted = reducedQueryPhase.isSortedByField;
+ ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
int sortScoreIndex = -1;
- if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) {
- TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs();
- if (fieldDocs instanceof CollapseTopFieldDocs &&
- fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) {
- sorted = false;
- } else {
- sorted = true;
- for (int i = 0; i < fieldDocs.fields.length; i++) {
- if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) {
- sortScoreIndex = i;
- }
+ if (sorted) {
+ for (int i = 0; i < reducedQueryPhase.sortField.length; i++) {
+ if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) {
+ sortScoreIndex = i;
}
}
}
@@ -351,8 +362,8 @@ public final class SearchPhaseController extends AbstractComponent {
for (SearchPhaseResult entry : fetchResults) {
entry.fetchResult().initCounter();
}
- int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from();
- int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size());
+ int from = ignoreFrom ? 0 : reducedQueryPhase.from;
+ int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size);
// with collapsing we can have more fetch hits than sorted docs
numSearchHits = Math.min(sortedDocs.length, numSearchHits);
// merge hits
@@ -376,7 +387,7 @@ public final class SearchPhaseController extends AbstractComponent {
searchHit.shard(fetchResult.getSearchShardTarget());
if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
- searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats());
+ searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
@@ -393,42 +404,42 @@ public final class SearchPhaseController extends AbstractComponent {
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
- public ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) {
- return reducedQueryPhase(queryResults, null, 0);
+ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
+ return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
- * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
+ * @param bufferedAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
+ * from all non-null query results.
+ * @param bufferedTopDocs a list of pre-collected / buffered top docs. if this list is non-null all top docs have been consumed
* from all non-null query results.
* @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
- List<InternalAggregations> bufferdAggs, int numReducePhases) {
+ List<InternalAggregations> bufferedAggs, List<TopDocs> bufferedTopDocs,
+ TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
- long totalHits = 0;
- long fetchHits = 0;
- float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
- numReducePhases);
+ return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
+ timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs;
final List<InternalAggregations> aggregationsList;
- if (bufferdAggs != null) {
+ if (bufferedAggs != null) {
consumeAggs = false;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
- aggregationsList = bufferdAggs;
+ aggregationsList = bufferedAggs;
} else if (firstResult.hasAggs()) {
// the number of shards was less than the buffer size so we reduce agg results directly
aggregationsList = new ArrayList<>(queryResults.size());
@@ -443,8 +454,12 @@ public final class SearchPhaseController extends AbstractComponent {
final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap();
final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size())
: Collections.emptyMap();
+ int from = 0;
+ int size = 0;
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
+ from = result.from();
+ size = result.size();
if (result.searchTimedOut()) {
timedOut = true;
}
@@ -455,11 +470,6 @@ public final class SearchPhaseController extends AbstractComponent {
terminatedEarly = true;
}
}
- totalHits += result.topDocs().totalHits;
- fetchHits += result.topDocs().scoreDocs.length;
- if (!Float.isNaN(result.topDocs().getMaxScore())) {
- maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
- }
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
@@ -480,8 +490,11 @@ public final class SearchPhaseController extends AbstractComponent {
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
- shardResults, numReducePhases);
+ final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
+ return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
+ timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
+ firstResult != null ? firstResult.sortValueFormats() : null,
+ numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
}
@@ -522,8 +535,6 @@ public final class SearchPhaseController extends AbstractComponent {
final boolean timedOut;
// non null and true if at least one reduced result was terminated early
final Boolean terminatedEarly;
- // an non-null arbitrary query result if was at least one reduced result
- final QuerySearchResult oneResult;
// the reduced suggest results
final Suggest suggest;
// the reduced internal aggregations
@@ -532,10 +543,25 @@ public final class SearchPhaseController extends AbstractComponent {
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;
-
- ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
- QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
- SearchProfileShardResults shardResults, int numReducePhases) {
+ // the searches merged top docs
+ final ScoreDoc[] scoreDocs;
+ // the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
+ final SortField[] sortField;
+ // <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
+ final boolean isSortedByField;
+ // the size of the top hits to return
+ final int size;
+ // <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
+ final boolean isEmptyResult;
+ // the offset into the merged top hits
+ final int from;
+ // sort value formats used to sort / format the result
+ final DocValueFormat[] sortValueFormats;
+
+ ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
+ InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs,
+ SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size,
+ int from, boolean isEmptyResult) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
@@ -548,27 +574,26 @@ public final class SearchPhaseController extends AbstractComponent {
}
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
- this.oneResult = oneResult;
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
+ this.scoreDocs = scoreDocs;
+ this.sortField = sortFields;
+ this.isSortedByField = isSortedByField;
+ this.size = size;
+ this.from = from;
+ this.isEmptyResult = isEmptyResult;
+ this.sortValueFormats = sortValueFormats;
}
/**
* Creates a new search response from the given merged hits.
- * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction)
+ * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}
-
- /**
- * Returns <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
- */
- public boolean isEmpty() {
- return oneResult == null;
- }
}
/**
@@ -577,12 +602,16 @@ public final class SearchPhaseController extends AbstractComponent {
* This implementation can be configured to batch up a certain amount of results and only reduce them
* iff the buffer is exhausted.
*/
- static final class QueryPhaseResultConsumer
- extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
- private final InternalAggregations[] buffer;
+ static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
+ private final InternalAggregations[] aggsBuffer;
+ private final TopDocs[] topDocsBuffer;
+ private final boolean hasAggs;
+ private final boolean hasTopDocs;
+ private final int bufferSize;
private int index;
private final SearchPhaseController controller;
private int numReducePhases = 0;
+ private final TopDocsStats topDocsStats = new TopDocsStats();
/**
* Creates a new {@link QueryPhaseResultConsumer}
@@ -591,7 +620,8 @@ public final class SearchPhaseController extends AbstractComponent {
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
* the buffer is used to incrementally reduce aggregation results before all shards responded.
*/
- private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) {
+ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize,
+ boolean hasTopDocs, boolean hasAggs) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
@@ -599,39 +629,68 @@ public final class SearchPhaseController extends AbstractComponent {
if (expectedResultSize <= bufferSize) {
throw new IllegalArgumentException("buffer size must be less than the expected result size");
}
+ if (hasAggs == false && hasTopDocs == false) {
+ throw new IllegalArgumentException("either aggs or top docs must be present");
+ }
this.controller = controller;
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
- this.buffer = new InternalAggregations[bufferSize];
+ this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
+ this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
+ this.hasTopDocs = hasTopDocs;
+ this.hasAggs = hasAggs;
+ this.bufferSize = bufferSize;
+
}
@Override
public void consumeResult(SearchPhaseResult result) {
super.consumeResult(result);
QuerySearchResult queryResult = result.queryResult();
- assert queryResult.hasAggs() : "this collector should only be used if aggs are requested";
consumeInternal(queryResult);
}
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
- InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs();
- if (index == buffer.length) {
- InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
- Arrays.fill(buffer, null);
+ if (index == bufferSize) {
+ if (hasAggs) {
+ InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
+ Arrays.fill(aggsBuffer, null);
+ aggsBuffer[0] = reducedAggs;
+ }
+ if (hasTopDocs) {
+ TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
+ querySearchResult.from() + querySearchResult.size() // we have to merge here in the same way we collect on a shard
+ , 0);
+ Arrays.fill(topDocsBuffer, null);
+ topDocsBuffer[0] = reducedTopDocs;
+ }
numReducePhases++;
- buffer[0] = reducedAggs;
index = 1;
}
final int i = index++;
- buffer[i] = aggregations;
+ if (hasAggs) {
+ aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
+ }
+ if (hasTopDocs) {
+ final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null
+ topDocsStats.add(topDocs);
+ SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex());
+ topDocsBuffer[i] = topDocs;
+ }
+ }
+
+ private synchronized List<InternalAggregations> getRemainingAggs() {
+ return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null;
}
- private synchronized List<InternalAggregations> getRemaining() {
- return Arrays.asList(buffer).subList(0, index);
+ private synchronized List<TopDocs> getRemainingTopDocs() {
+ return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
}
+
@Override
public ReducedQueryPhase reduce() {
- return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
+ return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
+ numReducePhases, false);
}
/**
@@ -649,17 +708,49 @@ public final class SearchPhaseController extends AbstractComponent {
*/
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
- if (source != null && source.aggregations() != null) {
+ boolean isScrollRequest = request.scroll() != null;
+ final boolean hasAggs = source != null && source.aggregations() != null;
+ final boolean hasTopDocs = source == null || source.size() != 0;
+
+ if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
+ // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
- return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
+ return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
@Override
public ReducedQueryPhase reduce() {
- return reducedQueryPhase(results.asList());
+ return reducedQueryPhase(results.asList(), isScrollRequest);
}
};
}
+
+ static final class TopDocsStats {
+ long totalHits;
+ long fetchHits;
+ float maxScore = Float.NEGATIVE_INFINITY;
+
+ void add(TopDocs topDocs) {
+ totalHits += topDocs.totalHits;
+ fetchHits += topDocs.scoreDocs.length;
+ if (!Float.isNaN(topDocs.getMaxScore())) {
+ maxScore = Math.max(maxScore, topDocs.getMaxScore());
+ }
+ }
+ }
+
+ static final class SortedTopDocs {
+ static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null);
+ final ScoreDoc[] scoreDocs;
+ final boolean isSortedByField;
+ final SortField[] sortFields;
+
+ SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) {
+ this.scoreDocs = scoreDocs;
+ this.isSortedByField = isSortedByField;
+ this.sortFields = sortFields;
+ }
+ }
}