summaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java15
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java299
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java5
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java18
-rw-r--r--core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java9
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchService.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java2
-rw-r--r--core/src/main/java/org/elasticsearch/search/query/QueryPhase.java59
-rw-r--r--core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java60
-rw-r--r--core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java227
-rw-r--r--core/src/test/java/org/elasticsearch/search/SearchServiceTests.java19
11 files changed, 490 insertions, 227 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
index 920dd1b000..a0e313f1d7 100644
--- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
+++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
@@ -98,27 +98,26 @@ final class FetchSearchPhase extends SearchPhase {
final int numShards = context.getNumShards();
final boolean isScrollSearch = context.getRequest().scroll() != null;
List<SearchPhaseResult> phaseResults = queryResults.asList();
- ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults);
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final Runnable finishPhase = ()
- -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
+ -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
// query AND fetch optimization
finishPhase.run();
} else {
- final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
- if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
+ final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
+ if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
phaseResults.stream()
.map(e -> e.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.run();
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
- searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
+ searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
@@ -188,7 +187,7 @@ final class FetchSearchPhase extends SearchPhase {
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// we only release search context that we did not fetch from if we are not scrolling
// and if it has at lease one hit that didn't make it to the global topDocs
- if (context.getRequest().scroll() == null && queryResult.hasHits()) {
+ if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) {
try {
Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection);
@@ -198,11 +197,11 @@ final class FetchSearchPhase extends SearchPhase {
}
}
- private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
+ private void moveToNextPhase(SearchPhaseController searchPhaseController,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
- sortedDocs, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
+ reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
index 38d793b93e..13b4b2f73d 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
@@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
@@ -56,7 +57,6 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -147,42 +147,47 @@ public final class SearchPhaseController extends AbstractComponent {
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param results the search phase results to obtain the sort docs from
+ * @param bufferedTopDocs the pre-consumed buffered top docs
+ * @param topDocsStats the top docs stats to fill
+ * @param from the offset into the search results top docs
+ * @param size the number of hits to return from the merged top docs
*/
- public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results) throws IOException {
+ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
+ final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
if (results.isEmpty()) {
- return EMPTY_DOCS;
+ return SortedTopDocs.EMPTY;
}
- final Collection<TopDocs> topDocs = new ArrayList<>();
+ final Collection<TopDocs> topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
- int from = -1;
- int size = -1;
- for (SearchPhaseResult sortedResult : results) {
+ for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once
/* We loop over all results once, group together the completion suggestions if there are any and collect relevant
* top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road
* this allowed to remove a single shared optimization code here since now we don't materialized a dense array of
* top docs anymore but instead only pass relevant results / top docs to the merge method*/
QuerySearchResult queryResult = sortedResult.queryResult();
- if (queryResult.hasHits()) {
- from = queryResult.from();
- size = queryResult.size();
- TopDocs td = queryResult.topDocs();
- if (td != null && td.scoreDocs.length > 0) {
+ if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
+ final TopDocs td = queryResult.consumeTopDocs();
+ assert td != null;
+ topDocsStats.add(td);
+ if (td.scoreDocs.length > 0) { // make sure we set the shard index before we add it - the consumer didn't do that yet
setShardIndex(td, queryResult.getShardIndex());
topDocs.add(td);
}
+ }
+ if (queryResult.hasSuggestHits()) {
Suggest shardSuggest = queryResult.suggest();
- if (shardSuggest != null) {
- for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
- suggestion.setShardIndex(sortedResult.getShardIndex());
- List<Suggestion<CompletionSuggestion.Entry>> suggestions =
- groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
- suggestions.add(suggestion);
- }
+ for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
+ suggestion.setShardIndex(sortedResult.getShardIndex());
+ List<Suggestion<CompletionSuggestion.Entry>> suggestions =
+ groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+ suggestions.add(suggestion);
}
}
}
- if (size != -1) {
- final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
+ final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false;
+ if (hasHits) {
+ final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
+ final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs;
ScoreDoc[] scoreDocs = mergedScoreDocs;
if (groupedCompletionSuggestions.isEmpty() == false) {
int numSuggestDocs = 0;
@@ -204,23 +209,35 @@ public final class SearchPhaseController extends AbstractComponent {
}
}
}
- return scoreDocs;
+ final boolean isSortedByField;
+ final SortField[] sortFields;
+ if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
+ TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
+ isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
+ fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
+ sortFields = fieldDocs.fields;
+ } else {
+ isSortedByField = false;
+ sortFields = null;
+ }
+ return new SortedTopDocs(scoreDocs, isSortedByField, sortFields);
} else {
- // no relevant docs - just return an empty array
- return EMPTY_DOCS;
+ // no relevant docs
+ return SortedTopDocs.EMPTY;
}
}
- private ScoreDoc[] mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
+ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
if (results.isEmpty()) {
- return EMPTY_DOCS;
+ return null;
}
+ assert results.isEmpty() == false;
final boolean setShardIndex = false;
final TopDocs topDocs = results.stream().findFirst().get();
final TopDocs mergedTopDocs;
final int numShards = results.size();
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
- return topDocs.scoreDocs;
+ return topDocs;
} else if (topDocs instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs;
final Sort sort = new Sort(firstTopDocs.fields);
@@ -235,7 +252,7 @@ public final class SearchPhaseController extends AbstractComponent {
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex);
}
- return mergedTopDocs.scoreDocs;
+ return mergedTopDocs;
}
private static void setShardIndex(TopDocs topDocs, int shardIndex) {
@@ -249,12 +266,12 @@ public final class SearchPhaseController extends AbstractComponent {
}
}
- public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
- ScoreDoc[] sortedScoreDocs, int numShards) {
- ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
- if (reducedQueryPhase.isEmpty() == false) {
+ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) {
+ final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
+ if (reducedQueryPhase.isEmptyResult == false) {
+ final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs;
// from is always zero as when we use scroll, we ignore from
- long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size());
+ long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size);
// with collapsing we can have more hits than sorted docs
size = Math.min(sortedScoreDocs.length, size);
for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) {
@@ -288,13 +305,13 @@ public final class SearchPhaseController extends AbstractComponent {
* Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named
* completion suggestion ordered by suggestion name
*/
- public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
- ReducedQueryPhase reducedQueryPhase,
+ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reducedQueryPhase,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
- if (reducedQueryPhase.isEmpty()) {
+ if (reducedQueryPhase.isEmptyResult) {
return InternalSearchResponse.empty();
}
- SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup);
+ ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
+ SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup);
if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) {
int currentOffset = hits.getHits().length;
@@ -329,21 +346,15 @@ public final class SearchPhaseController extends AbstractComponent {
return reducedQueryPhase.buildResponse(hits);
}
- private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs,
+ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
- boolean sorted = false;
+ final boolean sorted = reducedQueryPhase.isSortedByField;
+ ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
int sortScoreIndex = -1;
- if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) {
- TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs();
- if (fieldDocs instanceof CollapseTopFieldDocs &&
- fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) {
- sorted = false;
- } else {
- sorted = true;
- for (int i = 0; i < fieldDocs.fields.length; i++) {
- if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) {
- sortScoreIndex = i;
- }
+ if (sorted) {
+ for (int i = 0; i < reducedQueryPhase.sortField.length; i++) {
+ if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) {
+ sortScoreIndex = i;
}
}
}
@@ -351,8 +362,8 @@ public final class SearchPhaseController extends AbstractComponent {
for (SearchPhaseResult entry : fetchResults) {
entry.fetchResult().initCounter();
}
- int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from();
- int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size());
+ int from = ignoreFrom ? 0 : reducedQueryPhase.from;
+ int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size);
// with collapsing we can have more fetch hits than sorted docs
numSearchHits = Math.min(sortedDocs.length, numSearchHits);
// merge hits
@@ -376,7 +387,7 @@ public final class SearchPhaseController extends AbstractComponent {
searchHit.shard(fetchResult.getSearchShardTarget());
if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
- searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats());
+ searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
@@ -393,42 +404,42 @@ public final class SearchPhaseController extends AbstractComponent {
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
- public ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) {
- return reducedQueryPhase(queryResults, null, 0);
+ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
+ return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
- * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
+ * @param bufferedAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
+ * from all non-null query results.
+ * @param bufferedTopDocs a list of pre-collected / buffered top docs. if this list is non-null all top docs have been consumed
* from all non-null query results.
* @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
- List<InternalAggregations> bufferdAggs, int numReducePhases) {
+ List<InternalAggregations> bufferedAggs, List<TopDocs> bufferedTopDocs,
+ TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
- long totalHits = 0;
- long fetchHits = 0;
- float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
- numReducePhases);
+ return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
+ timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs;
final List<InternalAggregations> aggregationsList;
- if (bufferdAggs != null) {
+ if (bufferedAggs != null) {
consumeAggs = false;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
- aggregationsList = bufferdAggs;
+ aggregationsList = bufferedAggs;
} else if (firstResult.hasAggs()) {
// the number of shards was less than the buffer size so we reduce agg results directly
aggregationsList = new ArrayList<>(queryResults.size());
@@ -443,8 +454,12 @@ public final class SearchPhaseController extends AbstractComponent {
final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap();
final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size())
: Collections.emptyMap();
+ int from = 0;
+ int size = 0;
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
+ from = result.from();
+ size = result.size();
if (result.searchTimedOut()) {
timedOut = true;
}
@@ -455,11 +470,6 @@ public final class SearchPhaseController extends AbstractComponent {
terminatedEarly = true;
}
}
- totalHits += result.topDocs().totalHits;
- fetchHits += result.topDocs().scoreDocs.length;
- if (!Float.isNaN(result.topDocs().getMaxScore())) {
- maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
- }
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
@@ -480,8 +490,11 @@ public final class SearchPhaseController extends AbstractComponent {
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
- shardResults, numReducePhases);
+ final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
+ return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
+ timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
+ firstResult != null ? firstResult.sortValueFormats() : null,
+ numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
}
@@ -522,8 +535,6 @@ public final class SearchPhaseController extends AbstractComponent {
final boolean timedOut;
// non null and true if at least one reduced result was terminated early
final Boolean terminatedEarly;
- // an non-null arbitrary query result if was at least one reduced result
- final QuerySearchResult oneResult;
// the reduced suggest results
final Suggest suggest;
// the reduced internal aggregations
@@ -532,10 +543,25 @@ public final class SearchPhaseController extends AbstractComponent {
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;
-
- ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
- QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
- SearchProfileShardResults shardResults, int numReducePhases) {
+ // the searches merged top docs
+ final ScoreDoc[] scoreDocs;
+ // the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
+ final SortField[] sortField;
+ // <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
+ final boolean isSortedByField;
+ // the size of the top hits to return
+ final int size;
+ // <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
+ final boolean isEmptyResult;
+ // the offset into the merged top hits
+ final int from;
+ // sort value formats used to sort / format the result
+ final DocValueFormat[] sortValueFormats;
+
+ ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
+ InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs,
+ SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size,
+ int from, boolean isEmptyResult) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
@@ -548,27 +574,26 @@ public final class SearchPhaseController extends AbstractComponent {
}
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
- this.oneResult = oneResult;
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
+ this.scoreDocs = scoreDocs;
+ this.sortField = sortFields;
+ this.isSortedByField = isSortedByField;
+ this.size = size;
+ this.from = from;
+ this.isEmptyResult = isEmptyResult;
+ this.sortValueFormats = sortValueFormats;
}
/**
* Creates a new search response from the given merged hits.
- * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction)
+ * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}
-
- /**
- * Returns <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
- */
- public boolean isEmpty() {
- return oneResult == null;
- }
}
/**
@@ -577,12 +602,16 @@ public final class SearchPhaseController extends AbstractComponent {
* This implementation can be configured to batch up a certain amount of results and only reduce them
* iff the buffer is exhausted.
*/
- static final class QueryPhaseResultConsumer
- extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
- private final InternalAggregations[] buffer;
+ static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
+ private final InternalAggregations[] aggsBuffer;
+ private final TopDocs[] topDocsBuffer;
+ private final boolean hasAggs;
+ private final boolean hasTopDocs;
+ private final int bufferSize;
private int index;
private final SearchPhaseController controller;
private int numReducePhases = 0;
+ private final TopDocsStats topDocsStats = new TopDocsStats();
/**
* Creates a new {@link QueryPhaseResultConsumer}
@@ -591,7 +620,8 @@ public final class SearchPhaseController extends AbstractComponent {
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
* the buffer is used to incrementally reduce aggregation results before all shards responded.
*/
- private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) {
+ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize,
+ boolean hasTopDocs, boolean hasAggs) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
@@ -599,39 +629,68 @@ public final class SearchPhaseController extends AbstractComponent {
if (expectedResultSize <= bufferSize) {
throw new IllegalArgumentException("buffer size must be less than the expected result size");
}
+ if (hasAggs == false && hasTopDocs == false) {
+ throw new IllegalArgumentException("either aggs or top docs must be present");
+ }
this.controller = controller;
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
- this.buffer = new InternalAggregations[bufferSize];
+ this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
+ this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
+ this.hasTopDocs = hasTopDocs;
+ this.hasAggs = hasAggs;
+ this.bufferSize = bufferSize;
+
}
@Override
public void consumeResult(SearchPhaseResult result) {
super.consumeResult(result);
QuerySearchResult queryResult = result.queryResult();
- assert queryResult.hasAggs() : "this collector should only be used if aggs are requested";
consumeInternal(queryResult);
}
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
- InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs();
- if (index == buffer.length) {
- InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
- Arrays.fill(buffer, null);
+ if (index == bufferSize) {
+ if (hasAggs) {
+ InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
+ Arrays.fill(aggsBuffer, null);
+ aggsBuffer[0] = reducedAggs;
+ }
+ if (hasTopDocs) {
+ TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
+ querySearchResult.from() + querySearchResult.size() // we have to merge here in the same way we collect on a shard
+ , 0);
+ Arrays.fill(topDocsBuffer, null);
+ topDocsBuffer[0] = reducedTopDocs;
+ }
numReducePhases++;
- buffer[0] = reducedAggs;
index = 1;
}
final int i = index++;
- buffer[i] = aggregations;
+ if (hasAggs) {
+ aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
+ }
+ if (hasTopDocs) {
+ final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null
+ topDocsStats.add(topDocs);
+ SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex());
+ topDocsBuffer[i] = topDocs;
+ }
+ }
+
+ private synchronized List<InternalAggregations> getRemainingAggs() {
+ return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null;
}
- private synchronized List<InternalAggregations> getRemaining() {
- return Arrays.asList(buffer).subList(0, index);
+ private synchronized List<TopDocs> getRemainingTopDocs() {
+ return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
}
+
@Override
public ReducedQueryPhase reduce() {
- return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
+ return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
+ numReducePhases, false);
}
/**
@@ -649,17 +708,49 @@ public final class SearchPhaseController extends AbstractComponent {
*/
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
- if (source != null && source.aggregations() != null) {
+ boolean isScrollRequest = request.scroll() != null;
+ final boolean hasAggs = source != null && source.aggregations() != null;
+ final boolean hasTopDocs = source == null || source.size() != 0;
+
+ if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
+ // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
- return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
+ return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
@Override
public ReducedQueryPhase reduce() {
- return reducedQueryPhase(results.asList());
+ return reducedQueryPhase(results.asList(), isScrollRequest);
}
};
}
+
+ static final class TopDocsStats {
+ long totalHits;
+ long fetchHits;
+ float maxScore = Float.NEGATIVE_INFINITY;
+
+ void add(TopDocs topDocs) {
+ totalHits += topDocs.totalHits;
+ fetchHits += topDocs.scoreDocs.length;
+ if (!Float.isNaN(topDocs.getMaxScore())) {
+ maxScore = Math.max(maxScore, topDocs.getMaxScore());
+ }
+ }
+ }
+
+ static final class SortedTopDocs {
+ static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null);
+ final ScoreDoc[] scoreDocs;
+ final boolean isSortedByField;
+ final SortField[] sortFields;
+
+ SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) {
+ this.scoreDocs = scoreDocs;
+ this.isSortedByField = isSortedByField;
+ this.sortFields = sortFields;
+ }
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
index c39a9fe6f2..b3ebaed3cb 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
@@ -173,9 +173,8 @@ final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private void innerFinishHim() throws Exception {
List<QueryFetchSearchResult> queryFetchSearchResults = queryFetchResults.asList();
- ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList());
- final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
- searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(true,
+ searchPhaseController.reducedQueryPhase(queryFetchSearchResults, true), queryFetchSearchResults, queryFetchResults::get);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
index 37071485a0..709738dcaf 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
@@ -55,7 +55,6 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
- private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
@@ -171,16 +170,15 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
private void executeFetchPhase() throws Exception {
- sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList());
- if (sortedShardDocs.length == 0) {
- finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList()));
+ final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(),
+ true);
+ if (reducedQueryPhase.scoreDocs.length == 0) {
+ finishHim(reducedQueryPhase);
return;
}
- final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
- SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList());
- final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs,
- queryResults.length());
+ final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), reducedQueryPhase.scoreDocs);
+ final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, queryResults.length());
final CountDown counter = new CountDown(docIdsToLoad.length);
for (int i = 0; i < docIdsToLoad.length; i++) {
final int index = i;
@@ -222,8 +220,8 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) {
try {
- final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase,
- fetchResults.asList(), fetchResults::get);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(),
+ fetchResults::get);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
index 2bf5e50a1c..fa82aa0ac6 100644
--- a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
+++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
@@ -31,14 +31,6 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* to get the concrete values as a list using {@link #asList()}.
*/
public class AtomicArray<E> {
-
- private static final AtomicArray EMPTY = new AtomicArray(0);
-
- @SuppressWarnings("unchecked")
- public static <E> E empty() {
- return (E) EMPTY;
- }
-
private final AtomicReferenceArray<E> array;
private volatile List<E> nonNullList;
@@ -53,7 +45,6 @@ public class AtomicArray<E> {
return array.length();
}
-
/**
* Sets the element at position {@code i} to the given value.
*
diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java
index a035228195..e601cec0fe 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchService.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchService.java
@@ -259,7 +259,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
loadOrExecuteQueryPhase(request, context);
- if (context.queryResult().hasHits() == false && context.scrollContext() == null) {
+ if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
@@ -341,7 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
- if (context.queryResult().hasHits() == false && context.scrollContext() == null) {
+ if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
index 83af0b9abd..97f2681252 100644
--- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
+++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
@@ -166,7 +166,7 @@ public class FetchPhase implements SearchPhase {
fetchSubPhase.hitsExecute(context, hits);
}
- context.fetchResult().hits(new SearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
+ context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
}
private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
index 13f32f74d0..272c57fe98 100644
--- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
+++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
@@ -142,7 +142,6 @@ public class QueryPhase implements SearchPhase {
queryResult.searchTimedOut(false);
final boolean doProfile = searchContext.getProfilers() != null;
- final SearchType searchType = searchContext.searchType();
boolean rescore = false;
try {
queryResult.from(searchContext.from());
@@ -165,12 +164,7 @@ public class QueryPhase implements SearchPhase {
if (searchContext.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList());
}
- topDocsCallable = new Callable<TopDocs>() {
- @Override
- public TopDocs call() throws Exception {
- return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
- }
- };
+ topDocsCallable = () -> new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
} else {
// Perhaps have a dedicated scroll phase?
final ScrollContext scrollContext = searchContext.scrollContext();
@@ -238,38 +232,35 @@ public class QueryPhase implements SearchPhase {
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList());
}
- topDocsCallable = new Callable<TopDocs>() {
- @Override
- public TopDocs call() throws Exception {
- final TopDocs topDocs;
- if (topDocsCollector instanceof TopDocsCollector) {
- topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
- } else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
- topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
+ topDocsCallable = () -> {
+ final TopDocs topDocs;
+ if (topDocsCollector instanceof TopDocsCollector) {
+ topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
+ } else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
+ topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
+ } else {
+ throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
+ }
+ if (scrollContext != null) {
+ if (scrollContext.totalHits == -1) {
+ // first round
+ scrollContext.totalHits = topDocs.totalHits;
+ scrollContext.maxScore = topDocs.getMaxScore();
} else {
- throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
+ // subsequent round: the total number of hits and
+ // the maximum score were computed on the first round
+ topDocs.totalHits = scrollContext.totalHits;
+ topDocs.setMaxScore(scrollContext.maxScore);
}
- if (scrollContext != null) {
- if (scrollContext.totalHits == -1) {
- // first round
- scrollContext.totalHits = topDocs.totalHits;
- scrollContext.maxScore = topDocs.getMaxScore();
- } else {
- // subsequent round: the total number of hits and
- // the maximum score were computed on the first round
- topDocs.totalHits = scrollContext.totalHits;
- topDocs.setMaxScore(scrollContext.maxScore);
- }
- if (searchContext.request().numberOfShards() == 1) {
- // if we fetch the document in the same roundtrip, we already know the last emitted doc
- if (topDocs.scoreDocs.length > 0) {
- // set the last emitted doc
- scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
- }
+ if (searchContext.request().numberOfShards() == 1) {
+ // if we fetch the document in the same roundtrip, we already know the last emitted doc
+ if (topDocs.scoreDocs.length > 0) {
+ // set the last emitted doc
+ scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
}
}
- return topDocs;
}
+ return topDocs;
};
}
diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
index 15403f9967..f071c62f12 100644
--- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
+++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
@@ -55,6 +55,9 @@ public final class QuerySearchResult extends SearchPhaseResult {
private Boolean terminatedEarly = null;
private ProfileShardResult profileShardResults;
private boolean hasProfileResults;
+ private boolean hasScoreDocs;
+ private int totalHits;
+ private float maxScore;
public QuerySearchResult() {
}
@@ -87,11 +90,34 @@ public final class QuerySearchResult extends SearchPhaseResult {
}
public TopDocs topDocs() {
+ if (topDocs == null) {
+ throw new IllegalStateException("topDocs already consumed");
+ }
+ return topDocs;
+ }
+
+ /**
+ * Returns <code>true</code> iff the top docs have already been consumed.
+ */
+ public boolean hasConsumedTopDocs() {
+ return topDocs == null;
+ }
+
+ /**
+ * Returns and nulls out the top docs for this search results. This allows to free up memory once the top docs are consumed.
+ * @throws IllegalStateException if the top docs have already been consumed.
+ */
+ public TopDocs consumeTopDocs() {
+ TopDocs topDocs = this.topDocs;
+ if (topDocs == null) {
+ throw new IllegalStateException("topDocs already consumed");
+ }
+ this.topDocs = null;
return topDocs;
}
public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) {
- this.topDocs = topDocs;
+ setTopDocs(topDocs);
if (topDocs.scoreDocs.length > 0 && topDocs.scoreDocs[0] instanceof FieldDoc) {
int numFields = ((FieldDoc) topDocs.scoreDocs[0]).fields.length;
if (numFields != sortValueFormats.length) {
@@ -102,12 +128,19 @@ public final class QuerySearchResult extends SearchPhaseResult {
this.sortValueFormats = sortValueFormats;
}
+ private void setTopDocs(TopDocs topDocs) {
+ this.topDocs = topDocs;
+ hasScoreDocs = topDocs.scoreDocs.length > 0;
+ this.totalHits = topDocs.totalHits;
+ this.maxScore = topDocs.getMaxScore();
+ }
+
public DocValueFormat[] sortValueFormats() {
return sortValueFormats;
}
/**
- * Retruns <code>true</code> if this query result has unconsumed aggregations
+ * Returns <code>true</code> if this query result has unconsumed aggregations
*/
public boolean hasAggs() {
return hasAggs;
@@ -195,10 +228,15 @@ public final class QuerySearchResult extends SearchPhaseResult {
return this;
}
- /** Returns true iff the result has hits */
- public boolean hasHits() {
- return (topDocs != null && topDocs.scoreDocs.length > 0) ||
- (suggest != null && suggest.hasScoreDocs());
+ /**
+ * Returns <code>true</code> if this result has any suggest score docs
+ */
+ public boolean hasSuggestHits() {
+ return (suggest != null && suggest.hasScoreDocs());
+ }
+
+ public boolean hasSearchContext() {
+ return hasScoreDocs || hasSuggestHits();
}
public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException {
@@ -227,7 +265,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
sortValueFormats[i] = in.readNamedWriteable(DocValueFormat.class);
}
}
- topDocs = readTopDocs(in);
+ setTopDocs(readTopDocs(in));
if (hasAggs = in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in);
}
@@ -278,4 +316,12 @@ public final class QuerySearchResult extends SearchPhaseResult {
out.writeOptionalBoolean(terminatedEarly);
out.writeOptionalWriteable(profileShardResults);
}
+
+ public int getTotalHits() {
+ return totalHits;
+ }
+
+ public float getMaxScore() {
+ return maxScore;
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
index 207183bae4..c92caef628 100644
--- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.lucene.Lucene;
@@ -42,6 +43,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.TestCluster;
import org.junit.Before;
import java.io.IOException;
@@ -51,12 +53,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
public class SearchPhaseControllerTests extends ESTestCase {
@@ -75,8 +81,16 @@ public class SearchPhaseControllerTests extends ESTestCase {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
- ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList());
+ Optional<SearchPhaseResult> first = results.asList().stream().findFirst();
+ int from = 0, size = 0;
+ if (first.isPresent()) {
+ from = first.get().queryResult().from();
+ size = first.get().queryResult().size();
+ }
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
+ from, size)
+ .scoreDocs;
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
accumulatedLength += suggestionSize;
@@ -84,48 +98,71 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertThat(sortedDocs.length, equalTo(accumulatedLength));
}
- public void testSortIsIdempotent() throws IOException {
+ public void testSortIsIdempotent() throws Exception {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
- AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
- randomBoolean() || true);
+ long randomSeed = randomLong();
+ boolean useConstantScore = randomBoolean();
+ AtomicArray<SearchPhaseResult> results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
+ useConstantScore);
boolean ignoreFrom = randomBoolean();
- ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList());
+ Optional<SearchPhaseResult> first = results.asList().stream().findFirst();
+ int from = 0, size = 0;
+ if (first.isPresent()) {
+ from = first.get().queryResult().from();
+ size = first.get().queryResult().size();
+ }
+ SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
+
+ results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
+ useConstantScore);
+ SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
+ ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
+ assertEquals(sortedDocs.length, sortedDocs2.length);
+ for (int i = 0; i < sortedDocs.length; i++) {
+ assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);
+ assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex);
+ assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f);
+ }
+ assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f);
+ assertEquals(topDocsStats.totalHits, topDocsStats2.totalHits);
+ assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits);
+ }
- ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList());
- assertArrayEquals(sortedDocs, sortedDocs2);
+ private AtomicArray<SearchPhaseResult> generateSeededQueryResults(long seed, int nShards,
+ List<CompletionSuggestion> suggestions,
+ int searchHitsSize, boolean useConstantScore) throws Exception {
+ return RandomizedContext.current().runWithPrivateRandomness(seed,
+ () -> generateQueryResults(nShards, suggestions, searchHitsSize, useConstantScore));
}
public void testMerge() throws IOException {
List<CompletionSuggestion> suggestions = new ArrayList<>();
+ int maxSuggestSize = 0;
for (int i = 0; i < randomIntBetween(1, 5); i++) {
- suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+ int size = randomIntBetween(1, 20);
+ maxSuggestSize += size;
+ suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), size));
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
-
- // calculate offsets and score doc array
- List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
- ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults);
- mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs));
- Suggest mergedSuggest = reducedSuggest(queryResults);
- for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
- if (suggestion instanceof CompletionSuggestion) {
- CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion);
- mergedScoreDocs.addAll(completionSuggestion.getOptions().stream()
- .map(CompletionSuggestion.Entry.Option::getDoc)
- .collect(Collectors.toList()));
- }
- }
- ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
- AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest);
- InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs,
- searchPhaseController.reducedQueryPhase(queryResults.asList()),
+ SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false);
+ AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs,
+ reducedQueryPhase.suggest);
+ InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
+ reducedQueryPhase,
searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
- assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
+ int suggestSize = 0;
+ for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
+ Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
+ suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size()));
+ }
+ assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize));
+ assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize));
Suggest suggestResult = mergedResponse.suggest();
- for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+ for (Suggest.Suggestion<?> suggestion : reducedQueryPhase.suggest) {
assertThat(suggestion, instanceOf(CompletionSuggestion.class));
if (suggestion.getEntries().get(0).getOptions().size() > 0) {
CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
@@ -209,16 +246,6 @@ public class SearchPhaseControllerTests extends ESTestCase {
.collect(Collectors.toList()));
}
- private ScoreDoc[] getTopShardDocs(AtomicArray<SearchPhaseResult> results) throws IOException {
- List<SearchPhaseResult> resultList = results.asList();
- TopDocs[] shardTopDocs = new TopDocs[resultList.size()];
- for (int i = 0; i < resultList.size(); i++) {
- shardTopDocs[i] = resultList.get(i).queryResult().topDocs();
- }
- int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results));
- return TopDocs.merge(topN, shardTopDocs).scoreDocs;
- }
-
private AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
@@ -309,30 +336,96 @@ public class SearchPhaseControllerTests extends ESTestCase {
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
- CountDownLatch latch = new CountDownLatch(expectedNumResults);
+ Thread[] threads = new Thread[expectedNumResults];
for (int i = 0; i < expectedNumResults; i++) {
int id = i;
- Thread t = new Thread(() -> {
+ threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
- result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
+ result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(id);
+ result.size(1);
consumer.consumeResult(result);
- latch.countDown();
});
- t.start();
+ threads[i].start();
+ }
+ for (int i = 0; i < expectedNumResults; i++) {
+ threads[i].join();
}
- latch.await();
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(max.get(), internalMax.getValue(), 0.0D);
+ assertEquals(1, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f);
}
+ public void testConsumerOnlyAggs() throws InterruptedException {
+ int expectedNumResults = randomIntBetween(1, 100);
+ int bufferSize = randomIntBetween(2, 200);
+ SearchRequest request = new SearchRequest();
+ request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
+ request.setBatchedReduceSize(bufferSize);
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
+ AtomicInteger max = new AtomicInteger();
+ for (int i = 0; i < expectedNumResults; i++) {
+ int id = i;
+ int number = randomIntBetween(1, 1000);
+ max.updateAndGet(prev -> Math.max(prev, number));
+ QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
+ result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]);
+ InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
+ DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
+ result.aggregations(aggs);
+ result.setShardIndex(id);
+ result.size(1);
+ consumer.consumeResult(result);
+ }
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
+ assertEquals(max.get(), internalMax.getValue(), 0.0D);
+ assertEquals(0, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ }
+
+
+ public void testConsumerOnlyHits() throws InterruptedException {
+ int expectedNumResults = randomIntBetween(1, 100);
+ int bufferSize = randomIntBetween(2, 200);
+ SearchRequest request = new SearchRequest();
+ if (randomBoolean()) {
+ request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
+ }
+ request.setBatchedReduceSize(bufferSize);
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
+ AtomicInteger max = new AtomicInteger();
+ for (int i = 0; i < expectedNumResults; i++) {
+ int id = i;
+ int number = randomIntBetween(1, 1000);
+ max.updateAndGet(prev -> Math.max(prev, number));
+ QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
+ result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
+ result.setShardIndex(id);
+ result.size(1);
+ consumer.consumeResult(result);
+ }
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(1, reduce.scoreDocs.length);
+ assertEquals(max.get(), reduce.maxScore, 0.0f);
+ assertEquals(expectedNumResults, reduce.totalHits);
+ assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f);
+ }
+
+
public void testNewSearchPhaseResults() {
for (int i = 0; i < 10; i++) {
int expectedNumResults = randomIntBetween(1, 10);
@@ -342,10 +435,22 @@ public class SearchPhaseControllerTests extends ESTestCase {
if ((hasAggs = randomBoolean())) {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
}
+ final boolean hasTopDocs;
+ if ((hasTopDocs = randomBoolean())) {
+ if (request.source() != null) {
+ request.source().size(randomIntBetween(1, 100));
+ } // no source means size = 10
+ } else {
+ if (request.source() == null) {
+ request.source(new SearchSourceBuilder().size(0));
+ } else {
+ request.source().size(0);
+ }
+ }
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
- if (hasAggs && expectedNumResults > bufferSize) {
+ if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) {
assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize,
consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
} else {
@@ -354,4 +459,36 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
}
}
+
+ public void testReduceTopNWithFromOffset() {
+ SearchRequest request = new SearchRequest();
+ request.source(new SearchSourceBuilder().size(5).from(5));
+ request.setBatchedReduceSize(randomIntBetween(2, 4));
+ InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
+ searchPhaseController.newSearchPhaseResults(request, 4);
+ int score = 100;
+ for (int i = 0; i < 4; i++) {
+ QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i));
+ ScoreDoc[] docs = new ScoreDoc[3];
+ for (int j = 0; j < docs.length; j++) {
+ docs[j] = new ScoreDoc(0, score--);
+ }
+ result.topDocs(new TopDocs(3, docs, docs[0].score), new DocValueFormat[0]);
+ result.setShardIndex(i);
+ result.size(5);
+ result.from(5);
+ consumer.consumeResult(result);
+ }
+ // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5
+
+ SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(5, reduce.scoreDocs.length);
+ assertEquals(100.f, reduce.maxScore, 0.0f);
+ assertEquals(12, reduce.totalHits);
+ assertEquals(95.0f, reduce.scoreDocs[0].score, 0.0f);
+ assertEquals(94.0f, reduce.scoreDocs[1].score, 0.0f);
+ assertEquals(93.0f, reduce.scoreDocs[2].score, 0.0f);
+ assertEquals(92.0f, reduce.scoreDocs[3].score, 0.0f);
+ assertEquals(91.0f, reduce.scoreDocs[4].score, 0.0f);
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java
index f3ff6be1cc..6fc795a882 100644
--- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java
@@ -223,8 +223,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
- // the search context should inherit the default timeout
- assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
+ try {
+ // the search context should inherit the default timeout
+ assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
+ } finally {
+ contextWithDefaultTimeout.decRef();
+ service.freeContext(contextWithDefaultTimeout.id());
+ }
final long seconds = randomIntBetween(6, 10);
final SearchContext context = service.createContext(
@@ -238,8 +243,14 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
- // the search context should inherit the query timeout
- assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds)));
+ try {
+ // the search context should inherit the query timeout
+ assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds)));
+ } finally {
+ context.decRef();
+ service.freeContext(context.id());
+ }
+
}
public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {