summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorJim Ferenczi <jim.ferenczi@elastic.co>2017-02-09 18:06:10 +0100
committerGitHub <noreply@github.com>2017-02-09 18:06:10 +0100
commit94087b3274c1f4c98c100886e4ad9aefd32c4582 (patch)
treeeff79a2934a49256ba6380721e81fb19b75b7ddd /core/src/main/java/org/elasticsearch
parent33915aefd89c736df1c56251365fdf5658229998 (diff)
Removes ExpandCollapseSearchResponseListener, search response listeners and blocking calls
This changes removes the SearchResponseListener that was used by the ExpandCollapseSearchResponseListener to expand collapsed hits. The removal of SearchResponseListener is not a breaking change because it was never released. This change also replace the blocking call in ExpandCollapseSearchResponseListener by a single asynchronous multi search request. The parallelism of the expand request can be set via CollapseBuilder#max_concurrent_group_searches Closes #23048
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java26
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java19
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java9
-rw-r--r--core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java111
-rw-r--r--core/src/main/java/org/elasticsearch/node/Node.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java28
-rw-r--r--core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java33
-rw-r--r--core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java117
8 files changed, 154 insertions, 193 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
index 0b5af75674..e40440a491 100644
--- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
@@ -559,25 +559,13 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
- // this is only a temporary fix since field collapsing executes a blocking call on response
- // which could be a network thread. we are fixing this but for now we just fork off again.
- // this should be removed once https://github.com/elastic/elasticsearch/issues/23048 is fixed
- getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
- @Override
- public void doRun() throws IOException {
- final boolean isScrollRequest = request.scroll() != null;
- final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs,
- reducedQueryPhase, fetchResultsArr);
- listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
- buildTookInMillis(), buildShardFailures()));
- }
-
- @Override
- public void onFailure(Exception e) {
- raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
- }
- });
+ final boolean isScrollRequest = request.scroll() != null;
+ final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase,
+ fetchResultsArr);
+ listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
+ buildTookInMillis(), buildShardFailures()));
}
-
}
+
+
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
index 5445155603..5193fe7278 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
@@ -38,6 +38,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
@@ -45,8 +47,6 @@ import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
@@ -65,7 +65,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -83,25 +82,11 @@ public class SearchPhaseController extends AbstractComponent {
private final BigArrays bigArrays;
private final ScriptService scriptService;
- private final List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListener;
public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) {
- this(settings, bigArrays, scriptService, Collections.emptyList());
- }
-
- public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService,
- List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListener) {
super(settings);
this.bigArrays = bigArrays;
this.scriptService = scriptService;
- this.searchResponseListener = searchResponseListener;
- }
-
- /**
- * Returns the search response listeners registry
- */
- public List<BiConsumer<SearchRequest, SearchResponse> > getSearchResponseListener() {
- return searchResponseListener;
}
public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
index 84ae9831c8..e5ff283b25 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -173,6 +173,15 @@ public class SearchTransportService extends AbstractLifecycleComponent {
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
}
+ /**
+ * Used by {@link TransportSearchAction} to send the expand queries (field collapsing).
+ */
+ void sendExecuteMultiSearch(DiscoveryNode node, final MultiSearchRequest request, SearchTask task,
+ final ActionListener<MultiSearchResponse> listener) {
+ transportService.sendChildRequest(transportService.getConnection(node), MultiSearchAction.NAME, request, task,
+ new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
+ }
+
public RemoteClusterService getRemoteClusterService() {
return remoteClusterService;
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
index a36e98dcb1..1db6ac6a1a 100644
--- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
@@ -36,8 +36,15 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@@ -47,11 +54,11 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@@ -212,16 +219,18 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return connection;
};
+ // Only enrich the search response iff collapsing has been specified:
final ActionListener<SearchResponse> wrapper;
- if (searchPhaseController.getSearchResponseListener().size() > 0) {
+ if (searchRequest.source() != null &&
+ searchRequest.source().collapse() != null &&
+ searchRequest.source().collapse().getInnerHit() != null) {
+
wrapper = ActionListener.wrap(searchResponse -> {
- List<BiConsumer<SearchRequest, SearchResponse>> responseListeners =
- searchPhaseController.getSearchResponseListener();
- for (BiConsumer<SearchRequest, SearchResponse> respListener : responseListeners) {
- respListener.accept(searchRequest, searchResponse);
+ if (searchResponse.getHits().getHits().length == 0) {
+ listener.onResponse(searchResponse);
+ } else {
+ expandCollapsedHits(nodes.getLocalNode(), task, searchRequest, searchResponse, listener);
}
- listener.onResponse(searchResponse);
-
}, listener::onFailure);
} else {
wrapper = listener;
@@ -284,4 +293,90 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
+ "] to a greater value if you really want to query that many shards at the same time.");
}
}
+
+ /**
+ * Expands collapsed using the {@link CollapseBuilder#innerHit} options.
+ */
+ void expandCollapsedHits(DiscoveryNode node,
+ SearchTask parentTask,
+ SearchRequest searchRequest,
+ SearchResponse searchResponse,
+ ActionListener<SearchResponse> finalListener) {
+ CollapseBuilder collapseBuilder = searchRequest.source().collapse();
+ MultiSearchRequest multiRequest = new MultiSearchRequest();
+ if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
+ multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
+ }
+ for (SearchHit hit : searchResponse.getHits()) {
+ BoolQueryBuilder groupQuery = new BoolQueryBuilder();
+ Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
+ if (collapseValue != null) {
+ groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
+ } else {
+ groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
+ }
+ QueryBuilder origQuery = searchRequest.source().query();
+ if (origQuery != null) {
+ groupQuery.must(origQuery);
+ }
+ SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(collapseBuilder.getInnerHit())
+ .query(groupQuery);
+ SearchRequest groupRequest = new SearchRequest(searchRequest.indices())
+ .types(searchRequest.types())
+ .source(sourceBuilder);
+ multiRequest.add(groupRequest);
+ }
+ searchTransportService.sendExecuteMultiSearch(node, multiRequest, parentTask,
+ ActionListener.wrap(response -> {
+ Iterator<MultiSearchResponse.Item> it = response.iterator();
+ for (SearchHit hit : searchResponse.getHits()) {
+ MultiSearchResponse.Item item = it.next();
+ if (item.isFailure()) {
+ finalListener.onFailure(item.getFailure());
+ return;
+ }
+ SearchHits innerHits = item.getResponse().getHits();
+ if (hit.getInnerHits() == null) {
+ hit.setInnerHits(new HashMap<>(1));
+ }
+ hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits);
+ }
+ finalListener.onResponse(searchResponse);
+ }, finalListener::onFailure)
+ );
+ }
+
+ private SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options) {
+ SearchSourceBuilder groupSource = new SearchSourceBuilder();
+ groupSource.from(options.getFrom());
+ groupSource.size(options.getSize());
+ if (options.getSorts() != null) {
+ options.getSorts().forEach(groupSource::sort);
+ }
+ if (options.getFetchSourceContext() != null) {
+ if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) {
+ groupSource.fetchSource(options.getFetchSourceContext().fetchSource());
+ } else {
+ groupSource.fetchSource(options.getFetchSourceContext().includes(),
+ options.getFetchSourceContext().excludes());
+ }
+ }
+ if (options.getDocValueFields() != null) {
+ options.getDocValueFields().forEach(groupSource::docValueField);
+ }
+ if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) {
+ options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField);
+ }
+ if (options.getScriptFields() != null) {
+ for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) {
+ groupSource.scriptField(field.fieldName(), field.script());
+ }
+ }
+ if (options.getHighlightBuilder() != null) {
+ groupSource.highlighter(options.getHighlightBuilder());
+ }
+ groupSource.explain(options.isExplain());
+ groupSource.trackScores(options.isTrackScores());
+ return groupSource;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 9279451d47..382423d9a2 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -350,7 +350,7 @@ public class Node implements Closeable {
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
- SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class), client);
+ SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
@@ -451,7 +451,7 @@ public class Node implements Closeable {
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings,
settingsModule.getClusterSettings(), transportService));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
- scriptModule.getScriptService(), searchModule.getSearchResponseListeners()));
+ scriptModule.getScriptService()));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index 01c4e3488e..8707d851d3 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -22,7 +22,6 @@ package org.elasticsearch.search;
import org.apache.lucene.search.BooleanQuery;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.geo.builders.ShapeBuilders;
@@ -223,7 +222,6 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
-import org.elasticsearch.search.collapse.ExpandCollapseSearchResponseListener;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
@@ -280,17 +278,12 @@ public class SearchModule {
"moving_avg_model");
private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();
- private final List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListeners = new ArrayList<> ();
private final Settings settings;
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
public SearchModule(Settings settings, boolean transportClient, List<SearchPlugin> plugins) {
- this(settings, transportClient, plugins, null);
- }
-
- public SearchModule(Settings settings, boolean transportClient, List<SearchPlugin> plugins, Client client) {
this.settings = settings;
this.transportClient = transportClient;
registerSuggesters(plugins);
@@ -306,9 +299,6 @@ public class SearchModule {
registerPipelineAggregations(plugins);
registerFetchSubPhases(plugins);
registerSearchExts(plugins);
- if (false == transportClient) {
- registerSearchResponseListeners(client, plugins);
- }
registerShapes();
}
@@ -341,13 +331,6 @@ public class SearchModule {
return movingAverageModelParserRegistry;
}
- /**
- * Returns the search response listeners registry
- */
- public List<BiConsumer<SearchRequest, SearchResponse> > getSearchResponseListeners() {
- return searchResponseListeners;
- }
-
private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse)
.addResultReader(InternalAvg::new));
@@ -699,13 +682,6 @@ public class SearchModule {
registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase);
}
- private void registerSearchResponseListeners(Client client, List<SearchPlugin> plugins) {
- if (client != null) {
- registerSearchResponseListener(new ExpandCollapseSearchResponseListener(client));
- }
- registerFromPlugin(plugins, p -> p.getSearchResponseListeners(), this::registerSearchResponseListener);
- }
-
private void registerSearchExts(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getSearchExts, this::registerSearchExt);
}
@@ -791,10 +767,6 @@ public class SearchModule {
(p, c) -> spec.getParser().fromXContent((QueryParseContext) c)));
}
- private void registerSearchResponseListener(BiConsumer<SearchRequest, SearchResponse> listener) {
- searchResponseListeners.add(requireNonNull(listener, "SearchResponseListener must not be null"));
- }
-
public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}
diff --git a/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java b/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java
index 75a7d6dadf..542ae2c3ab 100644
--- a/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java
@@ -45,17 +45,20 @@ import java.util.Objects;
public class CollapseBuilder extends ToXContentToBytes implements Writeable {
public static final ParseField FIELD_FIELD = new ParseField("field");
public static final ParseField INNER_HITS_FIELD = new ParseField("inner_hits");
+ public static final ParseField MAX_CONCURRENT_GROUP_REQUESTS_FIELD = new ParseField("max_concurrent_group_searches");
private static final ObjectParser<CollapseBuilder, QueryParseContext> PARSER =
new ObjectParser<>("collapse", CollapseBuilder::new);
static {
PARSER.declareString(CollapseBuilder::setField, FIELD_FIELD);
+ PARSER.declareInt(CollapseBuilder::setMaxConcurrentGroupRequests, MAX_CONCURRENT_GROUP_REQUESTS_FIELD);
PARSER.declareObject(CollapseBuilder::setInnerHits,
(p, c) -> InnerHitBuilder.fromXContent(c), INNER_HITS_FIELD);
}
private String field;
private InnerHitBuilder innerHit;
+ private int maxConcurrentGroupRequests = 0;
private CollapseBuilder() {}
@@ -70,12 +73,14 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
public CollapseBuilder(StreamInput in) throws IOException {
this.field = in.readString();
+ this.maxConcurrentGroupRequests = in.readVInt();
this.innerHit = in.readOptionalWriteable(InnerHitBuilder::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(field);
+ out.writeVInt(maxConcurrentGroupRequests);
out.writeOptionalWriteable(innerHit);
}
@@ -84,6 +89,7 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
return builder;
}
+ // for object parser only
private CollapseBuilder setField(String field) {
if (Strings.isEmpty(field)) {
throw new IllegalArgumentException("field name is null or empty");
@@ -97,6 +103,14 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
return this;
}
+ public CollapseBuilder setMaxConcurrentGroupRequests(int num) {
+ if (num < 1) {
+ throw new IllegalArgumentException("maxConcurrentGroupRequests` must be positive");
+ }
+ this.maxConcurrentGroupRequests = num;
+ return this;
+ }
+
/**
* The name of the field to collapse against
*/
@@ -111,6 +125,13 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
return this.innerHit;
}
+ /**
+ * Returns the amount of group requests that are allowed to be ran concurrently in the inner_hits phase.
+ */
+ public int getMaxConcurrentGroupRequests() {
+ return maxConcurrentGroupRequests;
+ }
+
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
@@ -121,6 +142,9 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
private void innerToXContent(XContentBuilder builder) throws IOException {
builder.field(FIELD_FIELD.getPreferredName(), field);
+ if (maxConcurrentGroupRequests > 0) {
+ builder.field(MAX_CONCURRENT_GROUP_REQUESTS_FIELD.getPreferredName(), maxConcurrentGroupRequests);
+ }
if (innerHit != null) {
builder.field(INNER_HITS_FIELD.getPreferredName(), innerHit);
}
@@ -133,13 +157,18 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
CollapseBuilder that = (CollapseBuilder) o;
- if (field != null ? !field.equals(that.field) : that.field != null) return false;
+ if (maxConcurrentGroupRequests != that.maxConcurrentGroupRequests) return false;
+ if (!field.equals(that.field)) return false;
return innerHit != null ? innerHit.equals(that.innerHit) : that.innerHit == null;
+
}
@Override
public int hashCode() {
- return Objects.hash(this.field, this.innerHit);
+ int result = field.hashCode();
+ result = 31 * result + (innerHit != null ? innerHit.hashCode() : 0);
+ result = 31 * result + maxConcurrentGroupRequests;
+ return result;
}
public CollapseContext build(SearchContext context) {
diff --git a/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java b/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java
deleted file mode 100644
index b9caa5216c..0000000000
--- a/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.search.collapse;
-
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.InnerHitBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-
-import java.util.HashMap;
-import java.util.Objects;
-import java.util.function.BiConsumer;
-
-/**
- * A search response listener that intercepts the search response and expands collapsed hits
- * using the {@link CollapseBuilder#innerHit} options.
- */
-public class ExpandCollapseSearchResponseListener implements BiConsumer<SearchRequest, SearchResponse> {
- private final Client client;
-
- public ExpandCollapseSearchResponseListener(Client client) {
- this.client = Objects.requireNonNull(client);
- }
-
- @Override
- public void accept(SearchRequest searchRequest, SearchResponse searchResponse) {
- if (searchRequest.source() == null) {
- return ;
- }
- CollapseBuilder collapseBuilder = searchRequest.source().collapse();
- if (collapseBuilder == null || collapseBuilder.getInnerHit() == null) {
- return ;
- }
- for (SearchHit hit : searchResponse.getHits()) {
- BoolQueryBuilder groupQuery = new BoolQueryBuilder();
- Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
- if (collapseValue != null) {
- groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
- } else {
- groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
- }
- QueryBuilder origQuery = searchRequest.source().query();
- if (origQuery != null) {
- groupQuery.must(origQuery);
- }
- SearchSourceBuilder sourceBuilder = createGroupSearchBuilder(collapseBuilder.getInnerHit())
- .query(groupQuery);
- SearchRequest groupRequest = new SearchRequest(searchRequest.indices())
- .types(searchRequest.types())
- .source(sourceBuilder);
- SearchResponse groupResponse = client.search(groupRequest).actionGet();
- SearchHits innerHits = groupResponse.getHits();
- if (hit.getInnerHits() == null) {
- hit.setInnerHits(new HashMap<>(1));
- }
- hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits);
- }
- }
-
- private SearchSourceBuilder createGroupSearchBuilder(InnerHitBuilder options) {
- SearchSourceBuilder groupSource = new SearchSourceBuilder();
- groupSource.from(options.getFrom());
- groupSource.size(options.getSize());
- if (options.getSorts() != null) {
- options.getSorts().forEach(groupSource::sort);
- }
- if (options.getFetchSourceContext() != null) {
- if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) {
- groupSource.fetchSource(options.getFetchSourceContext().fetchSource());
- } else {
- groupSource.fetchSource(options.getFetchSourceContext().includes(),
- options.getFetchSourceContext().excludes());
- }
- }
- if (options.getDocValueFields() != null) {
- options.getDocValueFields().forEach(groupSource::docValueField);
- }
- if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) {
- options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField);
- }
- if (options.getScriptFields() != null) {
- for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) {
- groupSource.scriptField(field.fieldName(), field.script());
- }
- }
- if (options.getHighlightBuilder() != null) {
- groupSource.highlighter(options.getHighlightBuilder());
- }
- groupSource.explain(options.isExplain());
- groupSource.trackScores(options.isTrackScores());
- return groupSource;
- }
-
-}