summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAreek Zillur <areek.zillur@elasticsearch.com>2016-08-05 17:49:56 -0400
committerAreek Zillur <areek.zillur@elasticsearch.com>2016-08-05 17:51:45 -0400
commitfee013c07c50aaa06415ec4ce4c77f398add0c1e (patch)
tree2a7b018797d2bd2f8f47ce4726b7f0830c7f3e73
parent3be1e7ec35df78c3e697e77ff30281f4ba9d5e72 (diff)
Add support for returning documents with completion suggester
This commit enables completion suggester to return documents associated with suggestions. Now the document source is returned with every suggestion, which respects source filtering options. In case of suggest queries spanning more than one shard, the suggest is executed in two phases, where the last phase fetches the relevant documents from shards, implying executing suggest requests against a single shard is more performant due to the document fetch overhead when the suggest spans multiple shards.
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java18
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java4
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java19
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java11
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java19
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java4
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java11
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchService.java59
-rw-r--r--core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java256
-rw-r--r--core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java3
-rw-r--r--core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java15
-rw-r--r--core/src/main/java/org/elasticsearch/search/suggest/Suggest.java62
-rw-r--r--core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java2
-rw-r--r--core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java131
-rw-r--r--core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java234
-rw-r--r--core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java113
-rw-r--r--core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java73
-rw-r--r--core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java61
-rw-r--r--docs/reference/search/suggesters/completion-suggest.asciidoc25
20 files changed, 918 insertions, 206 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
index 642748bd03..f9103f0cdd 100644
--- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
@@ -46,6 +46,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
+import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
@@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
- protected volatile ScoreDoc[] sortedShardList;
+ protected volatile ScoreDoc[] sortedShardDocs;
protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
@@ -321,8 +322,11 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
- final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
- if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
+ QuerySearchResult queryResult = entry.value.queryResult().queryResult();
+ final TopDocs topDocs = queryResult.topDocs();
+ final Suggest suggest = queryResult.suggest();
+ if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches
+ ||suggest != null && suggest.hasScoreDocs()) // or had suggest docs
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
@@ -343,12 +347,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry,
ScoreDoc[] lastEmittedDocPerShard) {
- if (lastEmittedDocPerShard != null) {
- ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
- return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
- } else {
- return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
- }
+ final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null;
+ return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
}
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java
index e19540e26d..8614d7b118 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java
@@ -118,8 +118,8 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
- sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
- final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
+ sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java
index cf3f971671..9d8305cf6b 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java
@@ -135,18 +135,17 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
void innerExecuteFetchPhase() throws Exception {
- boolean useScroll = request.scroll() != null;
- sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
- searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
+ final boolean isScrollRequest = request.scroll() != null;
+ sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
+ searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
- final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
- request, sortedShardList, firstResults.length()
- );
+ final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
+ searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
@@ -196,12 +195,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
- final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
+ final boolean isScrollRequest = request.scroll() != null;
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
fetchResults);
- String scrollId = null;
- if (request.scroll() != null) {
- scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
- }
+ String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java
index 5d55dd468a..fad4d60275 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java
@@ -60,14 +60,11 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
- boolean useScroll = request.scroll() != null;
- sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
- final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
+ final boolean isScrollRequest = request.scroll() != null;
+ sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
firstResults);
- String scrollId = null;
- if (request.scroll() != null) {
- scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
- }
+ String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java
index a6f9aa26f5..5f90d291dd 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java
@@ -68,18 +68,17 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
@Override
protected void moveToSecondPhase() throws Exception {
- boolean useScroll = request.scroll() != null;
- sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
- searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
+ final boolean isScrollRequest = request.scroll() != null;
+ sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
+ searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
- final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
- request, sortedShardList, firstResults.length()
- );
+ final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
+ searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
@@ -129,12 +128,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
- final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
+ final boolean isScrollRequest = request.scroll() != null;
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
fetchResults);
- String scrollId = null;
- if (request.scroll() != null) {
- scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
- }
+ String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,
successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
index 94ce1887c3..72154f224d 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
@@ -168,8 +168,8 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
}
private void innerFinishHim() throws Exception {
- ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
- final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
+ ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
index ac8715eeb9..d9f649a7a5 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
@@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
- private volatile ScoreDoc[] sortedShardList;
+ private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;
SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
@@ -165,9 +165,9 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
private void executeFetchPhase() throws Exception {
- sortedShardList = searchPhaseController.sortDocs(true, queryResults);
+ sortedShardDocs = searchPhaseController.sortDocs(true, queryResults);
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
- searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
+ searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
@@ -175,7 +175,8 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
- final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
+ final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(),
+ sortedShardDocs, queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
IntArrayList docIds = entry.value;
@@ -216,7 +217,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
private void innerFinishHim() {
- InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
+ InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java
index bfcfcb9d4c..4d618eb057 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchService.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchService.java
@@ -21,6 +21,7 @@ package org.elasticsearch.search;
import com.carrotsearch.hppc.ObjectFloatHashMap;
import org.apache.lucene.search.FieldDoc;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@@ -87,6 +88,8 @@ import org.elasticsearch.search.rescore.RescoreBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
@@ -94,6 +97,7 @@ import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -265,7 +269,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
loadOrExecuteQueryPhase(request, context);
- if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
+ if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
@@ -320,7 +324,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
- if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
+ if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
@@ -811,40 +815,55 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
- private static final int[] EMPTY_DOC_IDS = new int[0];
-
/**
* Shortcut ids to load, we load only "from" and up to "size". The phase controller
* handles this as well since the result is always size * shards for Q_A_F
*/
private void shortcutDocIdsToLoad(SearchContext context) {
+ final int[] docIdsToLoad;
+ int docsOffset = 0;
+ final Suggest suggest = context.queryResult().suggest();
+ int numSuggestDocs = 0;
+ final List<CompletionSuggestion> completionSuggestions;
+ if (suggest != null && suggest.hasScoreDocs()) {
+ completionSuggestions = suggest.filter(CompletionSuggestion.class);
+ for (CompletionSuggestion completionSuggestion : completionSuggestions) {
+ numSuggestDocs += completionSuggestion.getOptions().size();
+ }
+ } else {
+ completionSuggestions = Collections.emptyList();
+ }
if (context.request().scroll() != null) {
TopDocs topDocs = context.queryResult().topDocs();
- int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
+ docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
- docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
+ docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
}
- context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
} else {
TopDocs topDocs = context.queryResult().topDocs();
if (topDocs.scoreDocs.length < context.from()) {
// no more docs...
- context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
- return;
- }
- int totalSize = context.from() + context.size();
- int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
- int counter = 0;
- for (int i = context.from(); i < totalSize; i++) {
- if (i < topDocs.scoreDocs.length) {
- docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
- } else {
- break;
+ docIdsToLoad = new int[numSuggestDocs];
+ } else {
+ int totalSize = context.from() + context.size();
+ docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) +
+ numSuggestDocs];
+ for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) {
+ docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
}
- counter++;
}
- context.docIdsToLoad(docIdsToLoad, 0, counter);
}
+ for (CompletionSuggestion completionSuggestion : completionSuggestions) {
+ for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
+ docIdsToLoad[docsOffset++] = option.getDoc().doc;
+ }
+ }
+ context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
+ }
+
+ private static boolean hasHits(final QuerySearchResult searchResult) {
+ return searchResult.topDocs().scoreDocs.length > 0 ||
+ (searchResult.suggest() != null && searchResult.suggest().hasScoreDocs());
}
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
diff --git a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java
index b2ce044e4f..97f3b191aa 100644
--- a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java
@@ -30,7 +30,6 @@ import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
-import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
@@ -53,18 +52,22 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
-import org.elasticsearch.search.profile.query.QueryProfileShardResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.Suggest.Suggestion;
+import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -154,6 +157,10 @@ public class SearchPhaseController extends AbstractComponent {
}
/**
+ * Returns a score doc array of top N search docs across all shards, followed by top suggest docs for each
+ * named completion suggestion across all shards. If more than one named completion suggestion is specified in the
+ * request, the suggest docs for a named suggestion are ordered by the suggestion name.
+ *
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param resultsArr Shard result holder
@@ -191,19 +198,40 @@ public class SearchPhaseController extends AbstractComponent {
offset = 0;
}
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
- if (scoreDocs.length == 0 || scoreDocs.length < offset) {
- return EMPTY_DOCS;
+ ScoreDoc[] docs;
+ int numSuggestDocs = 0;
+ final Suggest suggest = result.queryResult().suggest();
+ final List<CompletionSuggestion> completionSuggestions;
+ if (suggest != null) {
+ completionSuggestions = suggest.filter(CompletionSuggestion.class);
+ for (CompletionSuggestion suggestion : completionSuggestions) {
+ numSuggestDocs += suggestion.getOptions().size();
+ }
+ } else {
+ completionSuggestions = Collections.emptyList();
}
-
- int resultDocsSize = result.size();
- if ((scoreDocs.length - offset) < resultDocsSize) {
- resultDocsSize = scoreDocs.length - offset;
+ int docsOffset = 0;
+ if (scoreDocs.length == 0 || scoreDocs.length < offset) {
+ docs = new ScoreDoc[numSuggestDocs];
+ } else {
+ int resultDocsSize = result.size();
+ if ((scoreDocs.length - offset) < resultDocsSize) {
+ resultDocsSize = scoreDocs.length - offset;
+ }
+ docs = new ScoreDoc[resultDocsSize + numSuggestDocs];
+ for (int i = 0; i < resultDocsSize; i++) {
+ ScoreDoc scoreDoc = scoreDocs[offset + i];
+ scoreDoc.shardIndex = shardIndex;
+ docs[i] = scoreDoc;
+ docsOffset++;
+ }
}
- ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
- for (int i = 0; i < resultDocsSize; i++) {
- ScoreDoc scoreDoc = scoreDocs[offset + i];
- scoreDoc.shardIndex = shardIndex;
- docs[i] = scoreDoc;
+ for (CompletionSuggestion suggestion: completionSuggestions) {
+ for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
+ ScoreDoc doc = option.getDoc();
+ doc.shardIndex = shardIndex;
+ docs[docsOffset++] = doc;
+ }
}
return docs;
}
@@ -213,13 +241,7 @@ public class SearchPhaseController extends AbstractComponent {
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
QuerySearchResultProvider firstResult = sortedResults[0].value;
- int topN = firstResult.queryResult().size();
- if (firstResult.includeFetch()) {
- // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
- // this is also important since we shortcut and fetch only docs from "from" and up to "size"
- topN *= sortedResults.length;
- }
-
+ int topN = topN(results);
int from = firstResult.queryResult().from();
if (ignoreFrom) {
from = 0;
@@ -258,40 +280,86 @@ public class SearchPhaseController extends AbstractComponent {
}
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
}
- return mergedTopDocs.scoreDocs;
- }
- public ScoreDoc[] getLastEmittedDocPerShard(SearchRequest request, ScoreDoc[] sortedShardList, int numShards) {
- if (request.scroll() != null) {
- return getLastEmittedDocPerShard(sortedShardList, numShards);
- } else {
- return null;
+ ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
+ final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
+ // group suggestions and assign shard index
+ for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
+ Suggest shardSuggest = sortedResult.value.queryResult().suggest();
+ if (shardSuggest != null) {
+ for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
+ suggestion.setShardIndex(sortedResult.index);
+ List<Suggestion<CompletionSuggestion.Entry>> suggestions =
+ groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+ suggestions.add(suggestion);
+ }
+ }
+ }
+ if (groupedCompletionSuggestions.isEmpty() == false) {
+ int numSuggestDocs = 0;
+ List<Suggestion<? extends Entry<? extends Entry.Option>>> completionSuggestions =
+ new ArrayList<>(groupedCompletionSuggestions.size());
+ for (List<Suggestion<CompletionSuggestion.Entry>> groupedSuggestions : groupedCompletionSuggestions.values()) {
+ final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions);
+ assert completionSuggestion != null;
+ numSuggestDocs += completionSuggestion.getOptions().size();
+ completionSuggestions.add(completionSuggestion);
+ }
+ scoreDocs = new ScoreDoc[mergedTopDocs.scoreDocs.length + numSuggestDocs];
+ System.arraycopy(mergedTopDocs.scoreDocs, 0, scoreDocs, 0, mergedTopDocs.scoreDocs.length);
+ int offset = mergedTopDocs.scoreDocs.length;
+ Suggest suggestions = new Suggest(completionSuggestions);
+ for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) {
+ for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
+ scoreDocs[offset++] = option.getDoc();
+ }
+ }
}
+ return scoreDocs;
}
- public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) {
+ public ScoreDoc[] getLastEmittedDocPerShard(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
+ ScoreDoc[] sortedScoreDocs, int numShards) {
ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
- for (ScoreDoc scoreDoc : sortedShardList) {
- lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
+ if (queryResults.isEmpty() == false) {
+ long fetchHits = 0;
+ for (AtomicArray.Entry<? extends QuerySearchResultProvider> queryResult : queryResults) {
+ fetchHits += queryResult.value.queryResult().topDocs().scoreDocs.length;
+ }
+ // from is always zero as when we use scroll, we ignore from
+ long size = Math.min(fetchHits, topN(queryResults));
+ for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) {
+ ScoreDoc scoreDoc = sortedScoreDocs[sortedDocsIndex];
+ lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
+ }
}
return lastEmittedDocPerShard;
+
}
/**
* Builds an array, with potential null elements, with docs to load.
*/
- public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
+ public void fillDocIdsToLoad(AtomicArray<IntArrayList> docIdsToLoad, ScoreDoc[] shardDocs) {
for (ScoreDoc shardDoc : shardDocs) {
- IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
- if (list == null) {
- list = new IntArrayList(); // can't be shared!, uses unsafe on it later on
- docsIdsToLoad.set(shardDoc.shardIndex, list);
+ IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex);
+ if (shardDocIdsToLoad == null) {
+ shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on
+ docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad);
}
- list.add(shardDoc.doc);
+ shardDocIdsToLoad.add(shardDoc.doc);
}
}
- public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
+ /**
+ * Enriches search hits and completion suggestion hits from <code>sortedDocs</code> using <code>fetchResultsArr</code>,
+ * merges suggestions, aggregations and profile results
+ *
+ * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named
+ * completion suggestion ordered by suggestion name
+ */
+ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
+ AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
@@ -317,6 +385,7 @@ public class SearchPhaseController extends AbstractComponent {
// count the total (we use the query result provider here, since we might not get any hits (we scrolled past them))
long totalHits = 0;
+ long fetchHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
@@ -333,6 +402,7 @@ public class SearchPhaseController extends AbstractComponent {
}
}
totalHits += result.topDocs().totalHits;
+ fetchHits += result.topDocs().scoreDocs.length;
if (!Float.isNaN(result.topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
}
@@ -345,11 +415,13 @@ public class SearchPhaseController extends AbstractComponent {
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
entry.value.fetchResult().initCounter();
}
-
+ int from = ignoreFrom ? 0 : firstResult.queryResult().from();
+ int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults));
// merge hits
List<InternalSearchHit> hits = new ArrayList<>();
if (!fetchResults.isEmpty()) {
- for (ScoreDoc shardDoc : sortedDocs) {
+ for (int i = 0; i < numSearchHits; i++) {
+ ScoreDoc shardDoc = sortedDocs[i];
FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (fetchResultProvider == null) {
continue;
@@ -360,7 +432,6 @@ public class SearchPhaseController extends AbstractComponent {
InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
searchHit.score(shardDoc.score);
searchHit.shard(fetchResult.shardTarget());
-
if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields, firstResult.sortValueFormats());
@@ -368,7 +439,6 @@ public class SearchPhaseController extends AbstractComponent {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
}
-
hits.add(searchHit);
}
}
@@ -376,38 +446,72 @@ public class SearchPhaseController extends AbstractComponent {
// merge suggest results
Suggest suggest = null;
- if (!queryResults.isEmpty()) {
- final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
- boolean hasSuggestions = false;
- for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
- Suggest shardResult = entry.value.queryResult().queryResult().suggest();
-
- if (shardResult == null) {
- continue;
+ if (firstResult.suggest() != null) {
+ final Map<String, List<Suggestion>> groupedSuggestions = new HashMap<>();
+ for (AtomicArray.Entry<? extends QuerySearchResultProvider> queryResult : queryResults) {
+ Suggest shardSuggest = queryResult.value.queryResult().suggest();
+ if (shardSuggest != null) {
+ for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : shardSuggest) {
+ List<Suggestion> suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+ suggestionList.add(suggestion);
+ }
+ }
+ }
+ if (groupedSuggestions.isEmpty() == false) {
+ suggest = new Suggest(Suggest.reduce(groupedSuggestions));
+ if (!fetchResults.isEmpty()) {
+ int currentOffset = numSearchHits;
+ for (CompletionSuggestion suggestion : suggest.filter(CompletionSuggestion.class)) {
+ final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
+ for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
+ ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
+ FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
+ if (fetchSearchResultProvider == null) {
+ continue;
+ }
+ FetchSearchResult fetchResult = fetchSearchResultProvider.fetchResult();
+ int fetchResultIndex = fetchResult.counterGetAndIncrement();
+ if (fetchResultIndex < fetchResult.hits().internalHits().length) {
+ InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex];
+ CompletionSuggestion.Entry.Option suggestOption =
+ suggestionOptions.get(scoreDocIndex - currentOffset);
+ hit.score(shardDoc.score);
+ hit.shard(fetchResult.shardTarget());
+ suggestOption.setHit(hit);
+ }
+ }
+ currentOffset += suggestionOptions.size();
+ }
+ assert currentOffset == sortedDocs.length : "expected no more score doc slices";
}
- hasSuggestions = true;
- Suggest.group(groupedSuggestions, shardResult);
}
-
- suggest = hasSuggestions ? new Suggest(Suggest.reduce(groupedSuggestions)) : null;
}
- // merge addAggregation
+ // merge Aggregation
InternalAggregations aggregations = null;
- if (!queryResults.isEmpty()) {
- if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
- List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
- for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
- aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
+ if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
+ List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
+ for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
+ aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
+ }
+ ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
+ aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
+ List<SiblingPipelineAggregator> pipelineAggregators = firstResult.pipelineAggregators();
+ if (pipelineAggregators != null) {
+ List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
+ .map((p) -> (InternalAggregation) p)
+ .collect(Collectors.toList());
+ for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
+ InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
+ newAggs.add(newAgg);
}
- ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
- aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
+ aggregations = new InternalAggregations(newAggs);
}
}
//Collect profile results
SearchProfileShardResults shardResults = null;
- if (!queryResults.isEmpty() && firstResult.profileResults() != null) {
+ if (firstResult.profileResults() != null) {
Map<String, ProfileShardResult> profileResults = new HashMap<>(queryResults.size());
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
String key = entry.value.queryResult().shardTarget().toString();
@@ -416,24 +520,22 @@ public class SearchPhaseController extends AbstractComponent {
shardResults = new SearchProfileShardResults(profileResults);
}
- if (aggregations != null) {
- List<SiblingPipelineAggregator> pipelineAggregators = firstResult.pipelineAggregators();
- if (pipelineAggregators != null) {
- List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false).map((p) -> {
- return (InternalAggregation) p;
- }).collect(Collectors.toList());
- for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
- ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
- InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
- newAggs.add(newAgg);
- }
- aggregations = new InternalAggregations(newAggs);
- }
- }
-
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
return new InternalSearchResponse(searchHits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
}
+ /**
+ * returns the number of top results to be considered across all shards
+ */
+ private static int topN(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
+ QuerySearchResultProvider firstResult = queryResults.get(0).value;
+ int topN = firstResult.queryResult().size();
+ if (firstResult.includeFetch()) {
+ // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
+ // this is also important since we shortcut and fetch only docs from "from" and up to "size"
+ topN *= queryResults.size();
+ }
+ return topN;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java
index d908aca0fc..f6738f9972 100644
--- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java
+++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java
@@ -39,10 +39,7 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice
private OriginalIndices originalIndices;
public ShardFetchSearchRequest() {
- }
- public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list) {
- this(request, id, list, null);
}
public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
diff --git a/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java
index 59225f93a6..17f5e5ac70 100644
--- a/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java
+++ b/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java
@@ -43,7 +43,9 @@ public final class MatchedQueriesFetchSubPhase implements FetchSubPhase {
@Override
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
- if (hits.length == 0) {
+ if (hits.length == 0 ||
+ // in case the request has only suggest, parsed query is null
+ context.parsedQuery() == null) {
return;
}
hits = hits.clone(); // don't modify the incoming hits
diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
index 191537b4de..e1d46dd5fd 100644
--- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
+++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
@@ -415,8 +415,8 @@ public class InternalSearchHit implements SearchHit {
static final String INNER_HITS = "inner_hits";
}
- @Override
- public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ // public because we render hit as part of completion suggestion option
+ public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) throws IOException {
List<SearchHitField> metaFields = new ArrayList<>();
List<SearchHitField> otherFields = new ArrayList<>();
if (fields != null && !fields.isEmpty()) {
@@ -432,7 +432,6 @@ public class InternalSearchHit implements SearchHit {
}
}
- builder.startObject();
// For inner_hit hits shard is null and that is ok, because the parent search hit has all this information.
// Even if this was included in the inner_hit hits this would be the same, so better leave it out.
if (explanation() != null && shard != null) {
@@ -516,7 +515,6 @@ public class InternalSearchHit implements SearchHit {
}
builder.endObject();
}
- builder.endObject();
return builder;
}
@@ -533,6 +531,15 @@ public class InternalSearchHit implements SearchHit {
builder.endArray();
}
builder.endObject();
+
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ toInnerXContent(builder, params);
+ builder.endObject();
+ return builder;
}
public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java
index f8fbdaf969..95612693f8 100644
--- a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java
+++ b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Top level suggest result, containing the result for each suggestion.
@@ -48,18 +49,16 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
private static final String NAME = "suggest";
- private static final Comparator<Option> COMPARATOR = new Comparator<Suggest.Suggestion.Entry.Option>() {
- @Override
- public int compare(Option first, Option second) {
- int cmp = Float.compare(second.getScore(), first.getScore());
- if (cmp != 0) {
- return cmp;
- }
- return first.getText().compareTo(second.getText());
- }
- };
+ public static final Comparator<Option> COMPARATOR = (first, second) -> {
+ int cmp = Float.compare(second.getScore(), first.getScore());
+ if (cmp != 0) {
+ return cmp;
+ }
+ return first.getText().compareTo(second.getText());
+ };
private List<Suggestion<? extends Entry<? extends Option>>> suggestions;
+ private boolean hasScoreDocs;
private Map<String, Suggestion<? extends Entry<? extends Option>>> suggestMap;
@@ -68,7 +67,12 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
}
public Suggest(List<Suggestion<? extends Entry<? extends Option>>> suggestions) {
+ // we sort suggestions by their names to ensure iteration over suggestions are consistent
+ // this is needed as we need to fill in suggestion docs in SearchPhaseController#sortDocs
+ // in the same order as we enrich the suggestions with fetch results in SearchPhaseController#merge
+ suggestions.sort((o1, o2) -> o1.getName().compareTo(o2.getName()));
this.suggestions = suggestions;
+ this.hasScoreDocs = filter(CompletionSuggestion.class).stream().anyMatch(CompletionSuggestion::hasScoreDocs);
}
@Override
@@ -97,6 +101,13 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
return (T) suggestMap.get(name);
}
+ /**
+ * Whether any suggestions had query hits
+ */
+ public boolean hasScoreDocs() {
+ return hasScoreDocs;
+ }
+
@Override
public void readFrom(StreamInput in) throws IOException {
final int size = in.readVInt();
@@ -125,6 +136,7 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
suggestion.readFrom(in);
suggestions.add(suggestion);
}
+ hasScoreDocs = filter(CompletionSuggestion.class).stream().anyMatch(CompletionSuggestion::hasScoreDocs);
}
@Override
@@ -160,18 +172,6 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
return result;
}
- public static Map<String, List<Suggest.Suggestion>> group(Map<String, List<Suggest.Suggestion>> groupedSuggestions, Suggest suggest) {
- for (Suggestion<? extends Entry<? extends Option>> suggestion : suggest) {
- List<Suggestion> list = groupedSuggestions.get(suggestion.getName());
- if (list == null) {
- list = new ArrayList<>();
- groupedSuggestions.put(suggestion.getName(), list);
- }
- list.add(suggestion);
- }
- return groupedSuggestions;
- }
-
public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions) {
List<Suggestion<? extends Entry<? extends Option>>> reduced = new ArrayList<>(groupedSuggestions.size());
for (java.util.Map.Entry<String, List<Suggestion>> unmergedResults : groupedSuggestions.entrySet()) {
@@ -194,6 +194,16 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
}
/**
+ * @return only suggestions of type <code>suggestionType</code> contained in this {@link Suggest} instance
+ */
+ public <T extends Suggestion> List<T> filter(Class<T> suggestionType) {
+ return suggestions.stream()
+ .filter(suggestion -> suggestion.getClass() == suggestionType)
+ .map(suggestion -> (T) suggestion)
+ .collect(Collectors.toList());
+ }
+
+ /**
* The suggestion responses corresponding with the suggestions in the request.
*/
public static class Suggestion<T extends Suggestion.Entry> implements Iterable<T>, Streamable, ToXContent {
@@ -239,6 +249,13 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
}
/**
+ * @return The number of requested suggestion option size
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
* Merges the result of another suggestion into this suggestion.
* For internal usage.
*/
@@ -331,7 +348,6 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
return builder;
}
-
/**
* Represents a part from the suggest text with suggested options.
*/
diff --git a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java
index 759ab8d206..c27f378915 100644
--- a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java
+++ b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java
@@ -109,7 +109,7 @@ public class CompletionSuggester extends Suggester<CompletionSuggestionContext>
}
}
if (numResult++ < suggestionContext.getSize()) {
- CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option(
+ CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option(suggestDoc.doc,
new Text(suggestDoc.key.toString()), suggestDoc.score, contexts, payload);
completionSuggestEntry.addOption(option);
} else {
diff --git a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
index 0c209e00a7..a92cbfe1e2 100644
--- a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
+++ b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
@@ -18,11 +18,16 @@
*/
package org.elasticsearch.search.suggest.completion;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.suggest.Lookup;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.internal.InternalSearchHit;
+import org.elasticsearch.search.internal.InternalSearchHits;
+import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
@@ -35,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.elasticsearch.search.suggest.Suggest.COMPARATOR;
+
/**
* Suggestion response for {@link CompletionSuggester} results
*
@@ -62,6 +69,25 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
super(name, size);
}
+ /**
+ * @return the result options for the suggestion
+ */
+ public List<Entry.Option> getOptions() {
+ if (entries.isEmpty() == false) {
+ assert entries.size() == 1 : "CompletionSuggestion must have only one entry";
+ return entries.get(0).getOptions();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * @return whether there is any hits for the suggestion
+ */
+ public boolean hasScoreDocs() {
+ return getOptions().size() > 0;
+ }
+
private static final class OptionPriorityQueue extends org.apache.lucene.util.PriorityQueue<Entry.Option> {
private final Comparator<Suggest.Suggestion.Entry.Option> comparator;
@@ -90,30 +116,54 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
}
}
- @Override
- public Suggest.Suggestion<Entry> reduce(List<Suggest.Suggestion<Entry>> toReduce) {
- if (toReduce.size() == 1) {
- return toReduce.get(0);
+ /**
+ * Reduces suggestions to a single suggestion containing at most
+ * top {@link CompletionSuggestion#getSize()} options across <code>toReduce</code>
+ */
+ public static CompletionSuggestion reduceTo(List<Suggest.Suggestion<Entry>> toReduce) {
+ if (toReduce.isEmpty()) {
+ return null;
} else {
- // combine suggestion entries from participating shards on the coordinating node
- // the global top <code>size</code> entries are collected from the shard results
- // using a priority queue
- OptionPriorityQueue priorityQueue = new OptionPriorityQueue(size, sortComparator());
- for (Suggest.Suggestion<Entry> entries : toReduce) {
- assert entries.getEntries().size() == 1 : "CompletionSuggestion must have only one entry";
- for (Entry.Option option : entries.getEntries().get(0)) {
- if (option == priorityQueue.insertWithOverflow(option)) {
- // if the current option has overflown from pq,
- // we can assume all of the successive options
- // from this shard result will be overflown as well
- break;
+ final CompletionSuggestion leader = (CompletionSuggestion) toReduce.get(0);
+ final Entry leaderEntry = leader.getEntries().get(0);
+ final String name = leader.getName();
+ if (toReduce.size() == 1) {
+ return leader;
+ } else {
+ // combine suggestion entries from participating shards on the coordinating node
+ // the global top <code>size</code> entries are collected from the shard results
+ // using a priority queue
+ OptionPriorityQueue priorityQueue = new OptionPriorityQueue(leader.getSize(), COMPARATOR);
+ for (Suggest.Suggestion<Entry> suggestion : toReduce) {
+ assert suggestion.getName().equals(name) : "name should be identical across all suggestions";
+ for (Entry.Option option : ((CompletionSuggestion) suggestion).getOptions()) {
+ if (option == priorityQueue.insertWithOverflow(option)) {
+ // if the current option has overflown from pq,
+ // we can assume all of the successive options
+ // from this shard result will be overflown as well
+ break;
+ }
}
}
+ final CompletionSuggestion suggestion = new CompletionSuggestion(leader.getName(), leader.getSize());
+ final Entry entry = new Entry(leaderEntry.getText(), leaderEntry.getOffset(), leaderEntry.getLength());
+ Collections.addAll(entry.getOptions(), priorityQueue.get());
+ suggestion.addTerm(entry);
+ return suggestion;
+ }
+ }
+ }
+
+ @Override
+ public Suggest.Suggestion<Entry> reduce(List<Suggest.Suggestion<Entry>> toReduce) {
+ return reduceTo(toReduce);
+ }
+
+ public void setShardIndex(int shardIndex) {
+ if (entries.isEmpty() == false) {
+ for (Entry.Option option : getOptions()) {
+ option.setShardIndex(shardIndex);
}
- Entry options = this.entries.get(0);
- options.getOptions().clear();
- Collections.addAll(options.getOptions(), priorityQueue.get());
- return this;
}
}
@@ -145,9 +195,12 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
public static class Option extends Suggest.Suggestion.Entry.Option {
private Map<String, Set<CharSequence>> contexts;
private Map<String, List<Object>> payload;
+ private ScoreDoc doc;
+ private InternalSearchHit hit;
- public Option(Text text, float score, Map<String, Set<CharSequence>> contexts, Map<String, List<Object>> payload) {
+ public Option(int docID, Text text, float score, Map<String, Set<CharSequence>> contexts, Map<String, List<Object>> payload) {
super(text, score);
+ this.doc = new ScoreDoc(docID, score);
this.payload = payload;
this.contexts = contexts;
}
@@ -171,14 +224,30 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
return contexts;
}
- @Override
- public void setScore(float score) {
- super.setScore(score);
+ public ScoreDoc getDoc() {
+ return doc;
+ }
+
+ public InternalSearchHit getHit() {
+ return hit;
+ }
+
+ public void setShardIndex(int shardIndex) {
+ this.doc.shardIndex = shardIndex;
+ }
+
+ public void setHit(InternalSearchHit hit) {
+ this.hit = hit;
}
@Override
protected XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
- super.innerToXContent(builder, params);
+ builder.field("text", getText());
+ if (hit != null) {
+ hit.toInnerXContent(builder, params);
+ } else {
+ builder.field("score", getScore());
+ }
if (payload.size() > 0) {
builder.startObject("payload");
for (Map.Entry<String, List<Object>> entry : payload.entrySet()) {
@@ -207,6 +276,11 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
+ this.doc = Lucene.readScoreDoc(in);
+ if (in.readBoolean()) {
+ this.hit = InternalSearchHit.readSearchHit(in,
+ InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
+ }
int payloadSize = in.readInt();
this.payload = new LinkedHashMap<>(payloadSize);
for (int i = 0; i < payloadSize; i++) {
@@ -234,6 +308,13 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
+ Lucene.writeScoreDoc(out, doc);
+ if (hit != null) {
+ out.writeBoolean(true);
+ hit.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
+ } else {
+ out.writeBoolean(false);
+ }
out.writeInt(payload.size());
for (Map.Entry<String, List<Object>> entry : payload.entrySet()) {
out.writeString(entry.getKey());
diff --git a/core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java
new file mode 100644
index 0000000000..301617a0b2
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.controller;
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.text.Text;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.fetch.FetchSearchResult;
+import org.elasticsearch.search.fetch.FetchSearchResultProvider;
+import org.elasticsearch.search.internal.InternalSearchHit;
+import org.elasticsearch.search.internal.InternalSearchHits;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.search.query.QuerySearchResult;
+import org.elasticsearch.search.query.QuerySearchResultProvider;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class SearchPhaseControllerTests extends ESTestCase {
+ private SearchPhaseController searchPhaseController;
+
+ @Before
+ public void setup() {
+ searchPhaseController = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null, null);
+ }
+
+ public void testSort() throws Exception {
+ List<CompletionSuggestion> suggestions = new ArrayList<>();
+ for (int i = 0; i < randomIntBetween(1, 5); i++) {
+ suggestions.add(new CompletionSuggestion(randomAsciiOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+ }
+ int nShards = randomIntBetween(1, 20);
+ int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
+ AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize);
+ ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results);
+ int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
+ for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
+ int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
+ accumulatedLength += suggestionSize;
+ }
+ assertThat(sortedDocs.length, equalTo(accumulatedLength));
+ }
+
+ public void testMerge() throws IOException {
+ List<CompletionSuggestion> suggestions = new ArrayList<>();
+ for (int i = 0; i < randomIntBetween(1, 5); i++) {
+ suggestions.add(new CompletionSuggestion(randomAsciiOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+ }
+ int nShards = randomIntBetween(1, 20);
+ int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
+ AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize);
+
+ // calculate offsets and score doc array
+ List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
+ ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults);
+ mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs));
+ Suggest mergedSuggest = reducedSuggest(queryResults);
+ for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+ if (suggestion instanceof CompletionSuggestion) {
+ CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion);
+ mergedScoreDocs.addAll(completionSuggestion.getOptions().stream()
+ .map(CompletionSuggestion.Entry.Option::getDoc)
+ .collect(Collectors.toList()));
+ }
+ }
+ ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
+
+ InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, queryResults,
+ generateFetchResults(nShards, mergedSearchDocs, mergedSuggest));
+ assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
+ Suggest suggestResult = mergedResponse.suggest();
+ for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+ assertThat(suggestion, instanceOf(CompletionSuggestion.class));
+ if (suggestion.getEntries().get(0).getOptions().size() > 0) {
+ CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
+ assertNotNull(suggestionResult);
+ List<CompletionSuggestion.Entry.Option> options = suggestionResult.getEntries().get(0).getOptions();
+ assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size()));
+ for (CompletionSuggestion.Entry.Option option : options) {
+ assertNotNull(option.getHit());
+ }
+ }
+ }
+ }
+
+ private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
+ List<CompletionSuggestion> suggestions,
+ int searchHitsSize) {
+ AtomicArray<QuerySearchResultProvider> queryResults = new AtomicArray<>(nShards);
+ for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
+ QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
+ new SearchShardTarget("", new Index("", ""), shardIndex));
+ TopDocs topDocs = new TopDocs(0, new ScoreDoc[0], 0);
+ if (searchHitsSize > 0) {
+ int nDocs = randomIntBetween(0, searchHitsSize);
+ ScoreDoc[] scoreDocs = new ScoreDoc[nDocs];
+ float maxScore = 0F;
+ for (int i = 0; i < nDocs; i++) {
+ float score = Math.abs(randomFloat());
+ scoreDocs[i] = new ScoreDoc(i, score);
+ if (score > maxScore) {
+ maxScore = score;
+ }
+ }
+ topDocs = new TopDocs(scoreDocs.length, scoreDocs, maxScore);
+ }
+ List<CompletionSuggestion> shardSuggestion = new ArrayList<>();
+ for (CompletionSuggestion completionSuggestion : suggestions) {
+ CompletionSuggestion suggestion = new CompletionSuggestion(
+ completionSuggestion.getName(), completionSuggestion.getSize());
+ final CompletionSuggestion.Entry completionEntry = new CompletionSuggestion.Entry(new Text(""), 0, 5);
+ suggestion.addTerm(completionEntry);
+ int optionSize = randomIntBetween(1, suggestion.getSize());
+ float maxScore = randomIntBetween(suggestion.getSize(), (int) Float.MAX_VALUE);
+ for (int i = 0; i < optionSize; i++) {
+ completionEntry.addOption(new CompletionSuggestion.Entry.Option(i, new Text(""), maxScore,
+ Collections.emptyMap(), Collections.emptyMap()));
+ float dec = randomIntBetween(0, optionSize);
+ if (dec <= maxScore) {
+ maxScore -= dec;
+ }
+ }
+ suggestion.setShardIndex(shardIndex);
+ shardSuggestion.add(suggestion);
+ }
+ querySearchResult.topDocs(topDocs, null);
+ querySearchResult.size(searchHitsSize);
+ querySearchResult.suggest(new Suggest(new ArrayList<>(shardSuggestion)));
+ queryResults.set(shardIndex, querySearchResult);
+ }
+ return queryResults;
+ }
+
+ private int getTotalQueryHits(AtomicArray<QuerySearchResultProvider> results) {
+ int resultCount = 0;
+ for (AtomicArray.Entry<QuerySearchResultProvider> shardResult : results.asList()) {
+ resultCount += shardResult.value.queryResult().topDocs().totalHits;
+ }
+ return resultCount;
+ }
+
+ private Suggest reducedSuggest(AtomicArray<QuerySearchResultProvider> results) {
+ Map<String, List<Suggest.Suggestion<CompletionSuggestion.Entry>>> groupedSuggestion = new HashMap<>();
+ for (AtomicArray.Entry<QuerySearchResultProvider> entry : results.asList()) {
+ for (Suggest.Suggestion<?> suggestion : entry.value.queryResult().suggest()) {
+ List<Suggest.Suggestion<CompletionSuggestion.Entry>> suggests =
+ groupedSuggestion.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+ suggests.add((Suggest.Suggestion<CompletionSuggestion.Entry>) suggestion);
+ }
+ }
+ return new Suggest(groupedSuggestion.values().stream().map(CompletionSuggestion::reduceTo)
+ .collect(Collectors.toList()));
+ }
+
+ private ScoreDoc[] getTopShardDocs(AtomicArray<QuerySearchResultProvider> results) throws IOException {
+ List<AtomicArray.Entry<QuerySearchResultProvider>> resultList = results.asList();
+ TopDocs[] shardTopDocs = new TopDocs[resultList.size()];
+ for (int i = 0; i < resultList.size(); i++) {
+ shardTopDocs[i] = resultList.get(i).value.queryResult().topDocs();
+ }
+ int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results));
+ return TopDocs.merge(topN, shardTopDocs).scoreDocs;
+ }
+
+ private AtomicArray<FetchSearchResultProvider> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
+ AtomicArray<FetchSearchResultProvider> fetchResults = new AtomicArray<>(nShards);
+ for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
+ float maxScore = -1F;
+ SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex);
+ FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget);
+ List<InternalSearchHit> internalSearchHits = new ArrayList<>();
+ for (ScoreDoc scoreDoc : mergedSearchDocs) {
+ if (scoreDoc.shardIndex == shardIndex) {
+ internalSearchHits.add(new InternalSearchHit(scoreDoc.doc, "", new Text(""), Collections.emptyMap()));
+ if (scoreDoc.score > maxScore) {
+ maxScore = scoreDoc.score;
+ }
+ }
+ }
+ for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+ if (suggestion instanceof CompletionSuggestion) {
+ for (CompletionSuggestion.Entry.Option option : ((CompletionSuggestion) suggestion).getOptions()) {
+ ScoreDoc doc = option.getDoc();
+ if (doc.shardIndex == shardIndex) {
+ internalSearchHits.add(new InternalSearchHit(doc.doc, "", new Text(""), Collections.emptyMap()));
+ if (doc.score > maxScore) {
+ maxScore = doc.score;
+ }
+ }
+ }
+ }
+ }
+ InternalSearchHit[] hits = internalSearchHits.toArray(new InternalSearchHit[internalSearchHits.size()]);
+ fetchSearchResult.hits(new InternalSearchHits(hits, hits.length, maxScore));
+ fetchResults.set(shardIndex, fetchSearchResult);
+ }
+ return fetchResults;
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java b/core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java
index 93614ddbb1..547093df63 100644
--- a/core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java
+++ b/core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java
@@ -63,6 +63,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -72,6 +73,9 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHit;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasScore;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@@ -80,6 +84,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@SuppressCodecs("*") // requires custom completion format
@@ -391,6 +396,114 @@ public class CompletionSuggestSearchIT extends ESIntegTestCase {
}
}
+ public void testSuggestDocument() throws Exception {
+ final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
+ createIndexAndMapping(mapping);
+ int numDocs = randomIntBetween(10, 100);
+ List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+ for (int i = 1; i <= numDocs; i++) {
+ indexRequestBuilders.add(client().prepareIndex(INDEX, TYPE, "" + i)
+ .setSource(jsonBuilder()
+ .startObject()
+ .startObject(FIELD)
+ .field("input", "suggestion" + i)
+ .field("weight", i)
+ .endObject()
+ .endObject()
+ ));
+ }
+ indexRandom(true, indexRequestBuilders);
+ CompletionSuggestionBuilder prefix = SuggestBuilders.completionSuggestion(FIELD).prefix("sugg").size(numDocs);
+
+ SearchResponse searchResponse = client().prepareSearch(INDEX).suggest(new SuggestBuilder().addSuggestion("foo", prefix)).get();
+ CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion("foo");
+ CompletionSuggestion.Entry options = completionSuggestion.getEntries().get(0);
+ assertThat(options.getOptions().size(), equalTo(numDocs));
+ int id = numDocs;
+ for (CompletionSuggestion.Entry.Option option : options) {
+ assertThat(option.getText().toString(), equalTo("suggestion" + id));
+ assertSearchHit(option.getHit(), hasId("" + id));
+ assertSearchHit(option.getHit(), hasScore(((float) id)));
+ assertNotNull(option.getHit().source());
+ id--;
+ }
+ }
+
+ public void testSuggestDocumentNoSource() throws Exception {
+ final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
+ createIndexAndMapping(mapping);
+ int numDocs = randomIntBetween(10, 100);
+ List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+ for (int i = 1; i <= numDocs; i++) {
+ indexRequestBuilders.add(client().prepareIndex(INDEX, TYPE, "" + i)
+ .setSource(jsonBuilder()
+ .startObject()
+ .startObject(FIELD)
+ .field("input", "suggestion" + i)
+ .field("weight", i)
+ .endObject()
+ .endObject()
+ ));
+ }
+ indexRandom(true, indexRequestBuilders);
+ CompletionSuggestionBuilder prefix = SuggestBuilders.completionSuggestion(FIELD).prefix("sugg").size(numDocs);
+
+ SearchResponse searchResponse = client().prepareSearch(INDEX).suggest(
+ new SuggestBuilder().addSuggestion("foo", prefix)
+ ).setFetchSource(false).get();
+ CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion("foo");
+ CompletionSuggestion.Entry options = completionSuggestion.getEntries().get(0);
+ assertThat(options.getOptions().size(), equalTo(numDocs));
+ int id = numDocs;
+ for (CompletionSuggestion.Entry.Option option : options) {
+ assertThat(option.getText().toString(), equalTo("suggestion" + id));
+ assertSearchHit(option.getHit(), hasId("" + id));
+ assertSearchHit(option.getHit(), hasScore(((float) id)));
+ assertNull(option.getHit().source());
+ id--;
+ }
+ }
+
+ public void testSuggestDocumentSourceFiltering() throws Exception {
+ final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
+ createIndexAndMapping(mapping);
+ int numDocs = randomIntBetween(10, 100);
+ List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+ for (int i = 1; i <= numDocs; i++) {
+ indexRequestBuilders.add(client().prepareIndex(INDEX, TYPE, "" + i)
+ .setSource(jsonBuilder()
+ .startObject()
+ .startObject(FIELD)
+ .field("input", "suggestion" + i)
+ .field("weight", i)
+ .endObject()
+ .field("a", "include")
+ .field("b", "exclude")
+ .endObject()
+ ));
+ }
+ indexRandom(true, indexRequestBuilders);
+ CompletionSuggestionBuilder prefix = SuggestBuilders.completionSuggestion(FIELD).prefix("sugg").size(numDocs);
+
+ SearchResponse searchResponse = client().prepareSearch(INDEX).suggest(
+ new SuggestBuilder().addSuggestion("foo", prefix)
+ ).setFetchSource("a", "b").get();
+ CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion("foo");
+ CompletionSuggestion.Entry options = completionSuggestion.getEntries().get(0);
+ assertThat(options.getOptions().size(), equalTo(numDocs));
+ int id = numDocs;
+ for (CompletionSuggestion.Entry.Option option : options) {
+ assertThat(option.getText().toString(), equalTo("suggestion" + id));
+ assertSearchHit(option.getHit(), hasId("" + id));
+ assertSearchHit(option.getHit(), hasScore(((float) id)));
+ assertNotNull(option.getHit().source());
+ Set<String> sourceFields = option.getHit().sourceAsMap().keySet();
+ assertThat(sourceFields, contains("a"));
+ assertThat(sourceFields, not(contains("b")));
+ id--;
+ }
+ }
+
public void testThatWeightsAreWorking() throws Exception {
createIndexAndMapping(completionMappingBuilder);
diff --git a/core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java b/core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java
new file mode 100644
index 0000000000..bed6d5997d
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.suggest;
+
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
+import org.elasticsearch.search.suggest.phrase.PhraseSuggestion;
+import org.elasticsearch.search.suggest.term.TermSuggestion;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class SuggestTests extends ESTestCase {
+
+ public void testFilter() throws Exception {
+ List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> suggestions;
+ CompletionSuggestion completionSuggestion = new CompletionSuggestion(randomAsciiOfLength(10), 2);
+ PhraseSuggestion phraseSuggestion = new PhraseSuggestion(randomAsciiOfLength(10), 2);
+ TermSuggestion termSuggestion = new TermSuggestion(randomAsciiOfLength(10), 2, SortBy.SCORE);
+ suggestions = Arrays.asList(completionSuggestion, phraseSuggestion, termSuggestion);
+ Suggest suggest = new Suggest(suggestions);
+ List<PhraseSuggestion> phraseSuggestions = suggest.filter(PhraseSuggestion.class);
+ assertThat(phraseSuggestions.size(), equalTo(1));
+ assertThat(phraseSuggestions.get(0), equalTo(phraseSuggestion));
+ List<TermSuggestion> termSuggestions = suggest.filter(TermSuggestion.class);
+ assertThat(termSuggestions.size(), equalTo(1));
+ assertThat(termSuggestions.get(0), equalTo(termSuggestion));
+ List<CompletionSuggestion> completionSuggestions = suggest.filter(CompletionSuggestion.class);
+ assertThat(completionSuggestions.size(), equalTo(1));
+ assertThat(completionSuggestions.get(0), equalTo(completionSuggestion));
+ }
+
+ public void testSuggestionOrdering() throws Exception {
+ List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> suggestions;
+ suggestions = new ArrayList<>();
+ int n = randomIntBetween(2, 5);
+ for (int i = 0; i < n; i++) {
+ suggestions.add(new CompletionSuggestion(randomAsciiOfLength(10), randomIntBetween(3, 5)));
+ }
+ Collections.shuffle(suggestions, random());
+ Suggest suggest = new Suggest(suggestions);
+ List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> sortedSuggestions;
+ sortedSuggestions = new ArrayList<>(suggestions);
+ sortedSuggestions.sort((o1, o2) -> o1.getName().compareTo(o2.getName()));
+ List<CompletionSuggestion> completionSuggestions = suggest.filter(CompletionSuggestion.class);
+ assertThat(completionSuggestions.size(), equalTo(n));
+ for (int i = 0; i < n; i++) {
+ assertThat(completionSuggestions.get(i).getName(), equalTo(sortedSuggestions.get(i).getName()));
+ }
+ }
+
+}
diff --git a/core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java b/core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java
new file mode 100644
index 0000000000..5f2f84bc04
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.suggest.completion;
+
+import org.elasticsearch.common.text.Text;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+public class CompletionSuggestionTests extends ESTestCase {
+
+ public void testToReduce() throws Exception {
+ List<Suggest.Suggestion<CompletionSuggestion.Entry>> shardSuggestions = new ArrayList<>();
+ int nShards = randomIntBetween(1, 10);
+ String name = randomAsciiOfLength(10);
+ int size = randomIntBetween(3, 5);
+ for (int i = 0; i < nShards; i++) {
+ CompletionSuggestion suggestion = new CompletionSuggestion(name, size);
+ suggestion.addTerm(new CompletionSuggestion.Entry(new Text(""), 0, 0));
+ shardSuggestions.add(suggestion);
+ }
+ int totalResults = randomIntBetween(0, 5) * nShards;
+ float maxScore = randomIntBetween(totalResults, totalResults*2);
+ for (int i = 0; i < totalResults; i++) {
+ Suggest.Suggestion<CompletionSuggestion.Entry> suggestion = randomFrom(shardSuggestions);
+ suggestion.getEntries().get(0).addOption(new CompletionSuggestion.Entry.Option(i, new Text(""),
+ maxScore - i, Collections.emptyMap(), Collections.emptyMap()));
+ }
+ CompletionSuggestion reducedSuggestion = CompletionSuggestion.reduceTo(shardSuggestions);
+ assertNotNull(reducedSuggestion);
+ assertThat(reducedSuggestion.getOptions().size(), lessThanOrEqualTo(size));
+ int count = 0;
+ for (CompletionSuggestion.Entry.Option option : reducedSuggestion.getOptions()) {
+ assertThat(option.getDoc().doc, equalTo(count));
+ count++;
+ }
+ }
+}
diff --git a/docs/reference/search/suggesters/completion-suggest.asciidoc b/docs/reference/search/suggesters/completion-suggest.asciidoc
index d2d00948ec..82e99650e5 100644
--- a/docs/reference/search/suggesters/completion-suggest.asciidoc
+++ b/docs/reference/search/suggesters/completion-suggest.asciidoc
@@ -181,15 +181,23 @@ returns this response:
"length" : 3,
"options" : [ {
"text" : "Nirvana",
- "score" : 1.0
+ "_index": "music",
+ "_type": "song",
+ "_id": "1",
+ "_score": 1.0,
+ "_source": {
+ "suggest": ["Nevermind", "Nirvana"]
+ }
} ]
} ]
}
--------------------------------------------------
// TESTRESPONSE
-The configured weight for a suggestion is returned as `score`.
-The `text` field uses the `input` of your indexed suggestion.
+The configured weight for a suggestion is returned as `_score`.
+The `text` field uses the `input` of your indexed suggestion. The document
+source is returned in `_source`. <<search-request-source-filtering, source filtering>>
+parameters are supported for filtering the document source.
Suggestions are document oriented, you can specify fields to be
returned as part of suggestion payload. All field types (`string`,
@@ -200,7 +208,7 @@ as follows:
[source,js]
--------------------------------------------------
-POST music/song?refresh
+PUT music/song/2?refresh
{
"suggest" : "Nirvana",
"title" : "Nevermind"
@@ -243,7 +251,14 @@ returns:
"length" : 1,
"options" : [ {
"text" : "Nirvana",
- "score" : 1.0,
+ "_index": "music",
+ "_type": "song",
+ "_id": "2",
+ "_score" : 1.0,
+ "_source": {
+ "title": "Nevermind",
+ "suggest": "Nirvana"
+ },
"payload" : {
"title" : [ "Nevermind" ]
}