summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/search
diff options
context:
space:
mode:
authormarkharwood <markharwood@gmail.com>2017-05-24 13:46:43 +0100
committerGitHub <noreply@github.com>2017-05-24 13:46:43 +0100
commitb7197f5e2104e3d67fcd2233264ba39dc4058544 (patch)
treed829baccc590269a8e4a499aff907c52db5352c2 /core/src/main/java/org/elasticsearch/search
parentb5adb3cce9917dda22135b14778fb38cfcc0d7cb (diff)
SignificantText aggregation - like significant_terms, but for text (#24432)
* SignificantText aggregation - like significant_terms but doesn’t require fielddata=true, recommended used with `sampler` agg to limit expense of tokenizing docs and takes optional `filter_duplicate_text`:true setting to avoid stats skew from repeated sections of text in search results. Closes #23674
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search')
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java3
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java10
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java29
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java386
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java256
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java187
6 files changed, 863 insertions, 8 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index 85308f0c24..fea834c02a 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -132,6 +132,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.UnmappedSampler;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.UnmappedSignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND;
@@ -377,6 +378,8 @@ public class SearchModule {
.addResultReader(SignificantStringTerms.NAME, SignificantStringTerms::new)
.addResultReader(SignificantLongTerms.NAME, SignificantLongTerms::new)
.addResultReader(UnmappedSignificantTerms.NAME, UnmappedSignificantTerms::new));
+ registerAggregation(new AggregationSpec(SignificantTextAggregationBuilder.NAME, SignificantTextAggregationBuilder::new,
+ SignificantTextAggregationBuilder.getParser(significanceHeuristicParserRegistry)));
registerAggregation(new AggregationSpec(RangeAggregationBuilder.NAME, RangeAggregationBuilder::new,
RangeAggregationBuilder::parse).addResultReader(InternalRange::new));
registerAggregation(new AggregationSpec(DateRangeAggregationBuilder.NAME, DateRangeAggregationBuilder::new,
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java
index 8b704ee8a6..a7a4a5f843 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java
@@ -51,6 +51,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.Sampler;
import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
@@ -246,6 +247,15 @@ public class AggregationBuilders {
return new SignificantTermsAggregationBuilder(name, null);
}
+
+ /**
+ * Create a new {@link SignificantTextAggregationBuilder} aggregation with the given name and text field name
+ */
+ public static SignificantTextAggregationBuilder significantText(String name, String fieldName) {
+ return new SignificantTextAggregationBuilder(name, fieldName);
+ }
+
+
/**
* Create a new {@link DateHistogramAggregationBuilder} aggregation with the given
* name.
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java
index 4a04fa645b..a9af38ef6e 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java
@@ -55,7 +55,7 @@ public abstract class AggregatorBase extends Aggregator {
private DeferringBucketCollector recordingWrapper;
private final List<PipelineAggregator> pipelineAggregators;
private final CircuitBreakerService breakerService;
- private boolean failed = false;
+ private long requestBytesUsed;
/**
* Constructs a new Aggregator.
@@ -105,16 +105,31 @@ public abstract class AggregatorBase extends Aggregator {
return false; // unreachable
}
};
+ addRequestCircuitBreakerBytes(DEFAULT_WEIGHT);
+ }
+
+ /**
+ * Increment the number of bytes that have been allocated to service this request
+ * and potentially trigger a {@link CircuitBreakingException}. The number of bytes
+ * allocated is automatically decremented with the circuit breaker service on
+ * closure of this aggregator.
+ * For performance reasons subclasses should not call this millions of times
+ * each with small increments and instead batch up into larger allocations.
+ *
+ * @param bytesAllocated the number of additional bytes allocated
+ * @return the cumulative size in bytes allocated by this aggregator to service this request
+ */
+ protected long addRequestCircuitBreakerBytes(long bytesAllocated) {
try {
this.breakerService
.getBreaker(CircuitBreaker.REQUEST)
- .addEstimateBytesAndMaybeBreak(DEFAULT_WEIGHT, "<agg [" + name + "]>");
+ .addEstimateBytesAndMaybeBreak(bytesAllocated, "<agg [" + name + "]>");
+ this.requestBytesUsed += bytesAllocated;
+ return requestBytesUsed;
} catch (CircuitBreakingException cbe) {
- this.failed = true;
throw cbe;
- }
+ }
}
-
/**
* Most aggregators don't need scores, make sure to extend this method if
* your aggregator needs them.
@@ -265,9 +280,7 @@ public abstract class AggregatorBase extends Aggregator {
try {
doClose();
} finally {
- if (!this.failed) {
- this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-DEFAULT_WEIGHT);
- }
+ this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-this.requestBytesUsed);
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java
new file mode 100644
index 0000000000..c0fecd8be4
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.significant;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ParseFieldRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryParseContext;
+import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationInitializationException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
+import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class SignificantTextAggregationBuilder extends AbstractAggregationBuilder<SignificantTextAggregationBuilder> {
+ public static final String NAME = "significant_text";
+
+ static final ParseField FIELD_NAME = new ParseField("field");
+ static final ParseField SOURCE_FIELDS_NAME = new ParseField("source_fields");
+ static final ParseField FILTER_DUPLICATE_TEXT_FIELD_NAME = new ParseField(
+ "filter_duplicate_text");
+
+ static final TermsAggregator.BucketCountThresholds DEFAULT_BUCKET_COUNT_THRESHOLDS =
+ SignificantTermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS;
+ static final SignificanceHeuristic DEFAULT_SIGNIFICANCE_HEURISTIC = SignificantTermsAggregationBuilder.DEFAULT_SIGNIFICANCE_HEURISTIC;
+
+ private String fieldName = null;
+ private String [] sourceFieldNames = null;
+ private boolean filterDuplicateText = false;
+ private IncludeExclude includeExclude = null;
+ private QueryBuilder filterBuilder = null;
+ private TermsAggregator.BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(
+ DEFAULT_BUCKET_COUNT_THRESHOLDS);
+ private SignificanceHeuristic significanceHeuristic = DEFAULT_SIGNIFICANCE_HEURISTIC;
+
+ public static Aggregator.Parser getParser(
+ ParseFieldRegistry<SignificanceHeuristicParser> significanceHeuristicParserRegistry) {
+ ObjectParser<SignificantTextAggregationBuilder, QueryParseContext> parser = new ObjectParser<>(
+ SignificantTextAggregationBuilder.NAME);
+
+ parser.declareInt(SignificantTextAggregationBuilder::shardSize,
+ TermsAggregationBuilder.SHARD_SIZE_FIELD_NAME);
+
+ parser.declareLong(SignificantTextAggregationBuilder::minDocCount,
+ TermsAggregationBuilder.MIN_DOC_COUNT_FIELD_NAME);
+
+ parser.declareLong(SignificantTextAggregationBuilder::shardMinDocCount,
+ TermsAggregationBuilder.SHARD_MIN_DOC_COUNT_FIELD_NAME);
+
+ parser.declareInt(SignificantTextAggregationBuilder::size,
+ TermsAggregationBuilder.REQUIRED_SIZE_FIELD_NAME);
+
+ parser.declareString(SignificantTextAggregationBuilder::fieldName, FIELD_NAME);
+
+ parser.declareStringArray(SignificantTextAggregationBuilder::sourceFieldNames, SOURCE_FIELDS_NAME);
+
+
+ parser.declareBoolean(SignificantTextAggregationBuilder::filterDuplicateText,
+ FILTER_DUPLICATE_TEXT_FIELD_NAME);
+
+ parser.declareObject(SignificantTextAggregationBuilder::backgroundFilter,
+ (p, context) -> context.parseInnerQueryBuilder(),
+ SignificantTermsAggregationBuilder.BACKGROUND_FILTER);
+
+ parser.declareField((b, v) -> b.includeExclude(IncludeExclude.merge(v, b.includeExclude())),
+ IncludeExclude::parseInclude, IncludeExclude.INCLUDE_FIELD,
+ ObjectParser.ValueType.OBJECT_ARRAY_OR_STRING);
+
+ parser.declareField((b, v) -> b.includeExclude(IncludeExclude.merge(b.includeExclude(), v)),
+ IncludeExclude::parseExclude, IncludeExclude.EXCLUDE_FIELD,
+ ObjectParser.ValueType.STRING_ARRAY);
+
+ for (String name : significanceHeuristicParserRegistry.getNames()) {
+ parser.declareObject(SignificantTextAggregationBuilder::significanceHeuristic,
+ (p, context) -> {
+ SignificanceHeuristicParser significanceHeuristicParser = significanceHeuristicParserRegistry
+ .lookupReturningNullIfNotFound(name);
+ return significanceHeuristicParser.parse(context);
+ }, new ParseField(name));
+ }
+ return new Aggregator.Parser() {
+ @Override
+ public AggregationBuilder parse(String aggregationName, QueryParseContext context)
+ throws IOException {
+ return parser.parse(context.parser(),
+ new SignificantTextAggregationBuilder(aggregationName, null), context);
+ }
+ };
+ }
+
+ protected TermsAggregator.BucketCountThresholds getBucketCountThresholds() {
+ return new TermsAggregator.BucketCountThresholds(bucketCountThresholds);
+ }
+
+ public TermsAggregator.BucketCountThresholds bucketCountThresholds() {
+ return bucketCountThresholds;
+ }
+
+
+ @Override
+ public SignificantTextAggregationBuilder subAggregations(Builder subFactories) {
+ throw new AggregationInitializationException("Aggregator [" + name + "] of type ["
+ + getType() + "] cannot accept sub-aggregations");
+ }
+
+ @Override
+ public SignificantTextAggregationBuilder subAggregation(AggregationBuilder aggregation) {
+ throw new AggregationInitializationException("Aggregator [" + name + "] of type ["
+ + getType() + "] cannot accept sub-aggregations");
+ }
+
+ public SignificantTextAggregationBuilder bucketCountThresholds(
+ TermsAggregator.BucketCountThresholds bucketCountThresholds) {
+ if (bucketCountThresholds == null) {
+ throw new IllegalArgumentException(
+ "[bucketCountThresholds] must not be null: [" + name + "]");
+ }
+ this.bucketCountThresholds = bucketCountThresholds;
+ return this;
+ }
+
+ /**
+ * Sets the size - indicating how many term buckets should be returned
+ * (defaults to 10)
+ */
+ public SignificantTextAggregationBuilder size(int size) {
+ if (size <= 0) {
+ throw new IllegalArgumentException(
+ "[size] must be greater than 0. Found [" + size + "] in [" + name + "]");
+ }
+ bucketCountThresholds.setRequiredSize(size);
+ return this;
+ }
+
+ /**
+ * Sets the shard_size - indicating the number of term buckets each shard
+ * will return to the coordinating node (the node that coordinates the
+ * search execution). The higher the shard size is, the more accurate the
+ * results are.
+ */
+ public SignificantTextAggregationBuilder shardSize(int shardSize) {
+ if (shardSize <= 0) {
+ throw new IllegalArgumentException("[shardSize] must be greater than 0. Found ["
+ + shardSize + "] in [" + name + "]");
+ }
+ bucketCountThresholds.setShardSize(shardSize);
+ return this;
+ }
+
+ /**
+ * Sets the name of the text field that will be the subject of this
+ * aggregation.
+ */
+ public SignificantTextAggregationBuilder fieldName(String fieldName) {
+ this.fieldName = fieldName;
+ return this;
+ }
+
+
+ /**
+ * Selects the fields to load from _source JSON and analyze.
+ * If none are specified, the indexed "fieldName" value is assumed
+ * to also be the name of the JSON field holding the value
+ */
+ public SignificantTextAggregationBuilder sourceFieldNames(List<String> names) {
+ this.sourceFieldNames = names.toArray(new String [names.size()]);
+ return this;
+ }
+
+
+ /**
+ * Control if duplicate paragraphs of text should try be filtered from the
+ * statistical text analysis. Can improve results but slows down analysis.
+ * Default is false.
+ */
+ public SignificantTextAggregationBuilder filterDuplicateText(boolean filterDuplicateText) {
+ this.filterDuplicateText = filterDuplicateText;
+ return this;
+ }
+
+ /**
+ * Set the minimum document count terms should have in order to appear in
+ * the response.
+ */
+ public SignificantTextAggregationBuilder minDocCount(long minDocCount) {
+ if (minDocCount < 0) {
+ throw new IllegalArgumentException(
+ "[minDocCount] must be greater than or equal to 0. Found [" + minDocCount
+ + "] in [" + name + "]");
+ }
+ bucketCountThresholds.setMinDocCount(minDocCount);
+ return this;
+ }
+
+ /**
+ * Set the minimum document count terms should have on the shard in order to
+ * appear in the response.
+ */
+ public SignificantTextAggregationBuilder shardMinDocCount(long shardMinDocCount) {
+ if (shardMinDocCount < 0) {
+ throw new IllegalArgumentException(
+ "[shardMinDocCount] must be greater than or equal to 0. Found ["
+ + shardMinDocCount + "] in [" + name + "]");
+ }
+ bucketCountThresholds.setShardMinDocCount(shardMinDocCount);
+ return this;
+ }
+
+ public SignificantTextAggregationBuilder backgroundFilter(QueryBuilder backgroundFilter) {
+ if (backgroundFilter == null) {
+ throw new IllegalArgumentException(
+ "[backgroundFilter] must not be null: [" + name + "]");
+ }
+ this.filterBuilder = backgroundFilter;
+ return this;
+ }
+
+ public QueryBuilder backgroundFilter() {
+ return filterBuilder;
+ }
+
+ /**
+ * Set terms to include and exclude from the aggregation results
+ */
+ public SignificantTextAggregationBuilder includeExclude(IncludeExclude includeExclude) {
+ this.includeExclude = includeExclude;
+ return this;
+ }
+
+ /**
+ * Get terms to include and exclude from the aggregation results
+ */
+ public IncludeExclude includeExclude() {
+ return includeExclude;
+ }
+
+ public SignificantTextAggregationBuilder significanceHeuristic(
+ SignificanceHeuristic significanceHeuristic) {
+ if (significanceHeuristic == null) {
+ throw new IllegalArgumentException(
+ "[significanceHeuristic] must not be null: [" + name + "]");
+ }
+ this.significanceHeuristic = significanceHeuristic;
+ return this;
+ }
+
+ public SignificanceHeuristic significanceHeuristic() {
+ return significanceHeuristic;
+ }
+
+ /**
+ * @param name
+ * the name of this aggregation
+ * @param fieldName
+ * the name of the text field that will be the subject of this
+ * aggregation
+ *
+ */
+ public SignificantTextAggregationBuilder(String name, String fieldName) {
+ super(name);
+ this.fieldName = fieldName;
+ }
+
+ /**
+ * Read from a stream.
+ */
+ public SignificantTextAggregationBuilder(StreamInput in) throws IOException {
+ super(in);
+ fieldName = in.readString();
+ filterDuplicateText = in.readBoolean();
+ bucketCountThresholds = new BucketCountThresholds(in);
+ filterBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
+ includeExclude = in.readOptionalWriteable(IncludeExclude::new);
+ significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class);
+ sourceFieldNames = in.readOptionalStringArray();
+ }
+
+ @Override
+ protected void doWriteTo(StreamOutput out) throws IOException {
+ out.writeString(fieldName);
+ out.writeBoolean(filterDuplicateText);
+ bucketCountThresholds.writeTo(out);
+ out.writeOptionalNamedWriteable(filterBuilder);
+ out.writeOptionalWriteable(includeExclude);
+ out.writeNamedWriteable(significanceHeuristic);
+ out.writeOptionalStringArray(sourceFieldNames);
+ }
+
+ @Override
+ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<?> parent,
+ Builder subFactoriesBuilder) throws IOException {
+ SignificanceHeuristic executionHeuristic = this.significanceHeuristic.rewrite(context);
+ String[] execFieldNames = sourceFieldNames;
+ if (execFieldNames == null) {
+ execFieldNames = new String[] { fieldName };
+ }
+ return new SignificantTextAggregatorFactory(name, includeExclude, filterBuilder,
+ bucketCountThresholds, executionHeuristic, context, parent, subFactoriesBuilder,
+ fieldName, execFieldNames, filterDuplicateText, metaData);
+ }
+
+ @Override
+ protected XContentBuilder internalXContent(XContentBuilder builder, Params params)
+ throws IOException {
+ builder.startObject();
+ bucketCountThresholds.toXContent(builder, params);
+ if (fieldName != null) {
+ builder.field(FIELD_NAME.getPreferredName(), fieldName);
+ }
+ if (sourceFieldNames != null) {
+ builder.array(SOURCE_FIELDS_NAME.getPreferredName(), sourceFieldNames);
+ }
+
+ if (filterDuplicateText) {
+ builder.field(FILTER_DUPLICATE_TEXT_FIELD_NAME.getPreferredName(), filterDuplicateText);
+ }
+ if (filterBuilder != null) {
+ builder.field(SignificantTermsAggregationBuilder.BACKGROUND_FILTER.getPreferredName(),
+ filterBuilder);
+ }
+ if (includeExclude != null) {
+ includeExclude.toXContent(builder, params);
+ }
+ significanceHeuristic.toXContent(builder, params);
+
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ protected int doHashCode() {
+ return Objects.hash(bucketCountThresholds, fieldName, filterDuplicateText, filterBuilder,
+ includeExclude, significanceHeuristic, Arrays.hashCode(sourceFieldNames));
+ }
+
+ @Override
+ protected boolean doEquals(Object obj) {
+ SignificantTextAggregationBuilder other = (SignificantTextAggregationBuilder) obj;
+ return Objects.equals(bucketCountThresholds, other.bucketCountThresholds)
+ && Objects.equals(fieldName, other.fieldName)
+ && Arrays.equals(sourceFieldNames, other.sourceFieldNames)
+ && filterDuplicateText == other.filterDuplicateText
+ && Objects.equals(filterBuilder, other.filterBuilder)
+ && Objects.equals(includeExclude, other.includeExclude)
+ && Objects.equals(significanceHeuristic, other.significanceHeuristic);
+ }
+
+ @Override
+ public String getType() {
+ return NAME;
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java
new file mode 100644
index 0000000000..c7539a4ca0
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.significant;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter;
+import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.search.highlight.TokenStreamFromTermVector;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BytesRefHash;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
+import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude.StringFilter;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.internal.ContextIndexSearcher;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.lookup.SourceLookup;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+
+public class SignificantTextAggregator extends BucketsAggregator {
+
+ private final StringFilter includeExclude;
+ protected final BucketCountThresholds bucketCountThresholds;
+ protected long numCollectedDocs;
+ private final BytesRefHash bucketOrds;
+ private final SignificanceHeuristic significanceHeuristic;
+ private SignificantTextAggregatorFactory termsAggFactory;
+ private final DocValueFormat format = DocValueFormat.RAW;
+ private final String fieldName;
+ private final String[] sourceFieldNames;
+ private DuplicateByteSequenceSpotter dupSequenceSpotter = null ;
+ private long lastTrieSize;
+ private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000;
+
+
+
+ public SignificantTextAggregator(String name, AggregatorFactories factories,
+ SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
+ BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude,
+ SignificanceHeuristic significanceHeuristic, SignificantTextAggregatorFactory termsAggFactory,
+ String fieldName, String [] sourceFieldNames, boolean filterDuplicateText,
+ Map<String, Object> metaData) throws IOException {
+ super(name, factories, context, parent, pipelineAggregators, metaData);
+ this.bucketCountThresholds = bucketCountThresholds;
+ this.includeExclude = includeExclude;
+ this.significanceHeuristic = significanceHeuristic;
+ this.termsAggFactory = termsAggFactory;
+ this.fieldName = fieldName;
+ this.sourceFieldNames = sourceFieldNames;
+ bucketOrds = new BytesRefHash(1, context.bigArrays());
+ if(filterDuplicateText){
+ dupSequenceSpotter = new DuplicateByteSequenceSpotter();
+ lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
+ }
+ }
+
+
+
+
+ @Override
+ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
+ final LeafBucketCollector sub) throws IOException {
+ final BytesRefBuilder previous = new BytesRefBuilder();
+ return new LeafBucketCollectorBase(sub, null) {
+
+ @Override
+ public void collect(int doc, long bucket) throws IOException {
+ collectFromSource(doc, bucket, fieldName, sourceFieldNames);
+ numCollectedDocs++;
+ if (dupSequenceSpotter != null) {
+ dupSequenceSpotter.startNewSequence();
+ }
+ }
+
+ private void processTokenStream(int doc, long bucket, TokenStream ts, String fieldText) throws IOException{
+ if (dupSequenceSpotter != null) {
+ ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
+ }
+ CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
+ ts.reset();
+ try {
+ //Assume tokens will average 5 bytes in length to size number of tokens
+ BytesRefHash inDocTerms = new BytesRefHash(1+(fieldText.length()/5), context.bigArrays());
+
+ try{
+ while (ts.incrementToken()) {
+ if (dupSequenceSpotter != null) {
+ long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
+ long growth = newTrieSize - lastTrieSize;
+ // Only update the circuitbreaker after
+ if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
+ addRequestCircuitBreakerBytes(growth);
+ lastTrieSize = newTrieSize;
+ }
+ }
+ previous.clear();
+ previous.copyChars(termAtt);
+ BytesRef bytes = previous.get();
+ if (inDocTerms.add(bytes) >= 0) {
+ if (includeExclude == null || includeExclude.accept(bytes)) {
+ long bucketOrdinal = bucketOrds.add(bytes);
+ if (bucketOrdinal < 0) { // already seen
+ bucketOrdinal = -1 - bucketOrdinal;
+ collectExistingBucket(sub, doc, bucketOrdinal);
+ } else {
+ collectBucket(sub, doc, bucketOrdinal);
+ }
+ }
+ }
+ }
+ } finally{
+ Releasables.close(inDocTerms);
+ }
+ } finally{
+ ts.close();
+ }
+ }
+
+ private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException {
+ MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
+ if(fieldType == null){
+ throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName
+ +"] since it is not present");
+ }
+
+ SourceLookup sourceLookup = context.lookup().source();
+ sourceLookup.setSegmentAndDocument(ctx, doc);
+
+ for (String sourceField : sourceFieldNames) {
+ List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
+ textsToHighlight = textsToHighlight.stream().map(obj -> {
+ if (obj instanceof BytesRef) {
+ return fieldType.valueForDisplay(obj).toString();
+ } else {
+ return obj;
+ }
+ }).collect(Collectors.toList());
+
+ Analyzer analyzer = fieldType.indexAnalyzer();
+ for (Object fieldValue : textsToHighlight) {
+ String fieldText = fieldValue.toString();
+ TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
+ processTokenStream(doc, bucket, ts, fieldText);
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
+ assert owningBucketOrdinal == 0;
+
+ final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
+ long supersetSize = termsAggFactory.getSupersetNumDocs();
+ long subsetSize = numCollectedDocs;
+
+ BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
+ SignificantStringTerms.Bucket spare = null;
+ for (int i = 0; i < bucketOrds.size(); i++) {
+ final int docCount = bucketDocCount(i);
+ if (docCount < bucketCountThresholds.getShardMinDocCount()) {
+ continue;
+ }
+
+ if (spare == null) {
+ spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
+ }
+
+ bucketOrds.get(i, spare.termBytes);
+ spare.subsetDf = docCount;
+ spare.subsetSize = subsetSize;
+ spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
+ spare.supersetSize = supersetSize;
+ // During shard-local down-selection we use subset/superset stats
+ // that are for this shard only
+ // Back at the central reducer these properties will be updated with
+ // global stats
+ spare.updateScore(significanceHeuristic);
+
+ spare.bucketOrd = i;
+ spare = ordered.insertWithOverflow(spare);
+ }
+
+ final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
+ for (int i = ordered.size() - 1; i >= 0; i--) {
+ final SignificantStringTerms.Bucket bucket = ordered.pop();
+ // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
+ bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
+ bucket.aggregations = bucketAggregations(bucket.bucketOrd);
+ list[i] = bucket;
+ }
+
+ return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
+ bucketCountThresholds.getMinDocCount(), pipelineAggregators(),
+ metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
+ }
+
+
+ @Override
+ public SignificantStringTerms buildEmptyAggregation() {
+ // We need to account for the significance of a miss in our global stats - provide corpus size as context
+ ContextIndexSearcher searcher = context.searcher();
+ IndexReader topReader = searcher.getIndexReader();
+ int supersetSize = topReader.numDocs();
+ return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+ pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
+ }
+
+ @Override
+ public void doClose() {
+ Releasables.close(bucketOrds, termsAggFactory);
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java
new file mode 100644
index 0000000000..629f698994
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.significant;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.PostingsEnum;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
+import org.elasticsearch.common.lucene.index.FreqTermsEnum;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.bucket.BucketUtils;
+import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
+import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SignificantTextAggregatorFactory extends AggregatorFactory<SignificantTextAggregatorFactory>
+ implements Releasable {
+
+ private final IncludeExclude includeExclude;
+ private String indexedFieldName;
+ private MappedFieldType fieldType;
+ private final String[] sourceFieldNames;
+ private FilterableTermsEnum termsEnum;
+ private int numberOfAggregatorsCreated;
+ private final Query filter;
+ private final int supersetNumDocs;
+ private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
+ private final SignificanceHeuristic significanceHeuristic;
+ private final DocValueFormat format = DocValueFormat.RAW;
+ private final boolean filterDuplicateText;
+
+ public SignificantTextAggregatorFactory(String name, IncludeExclude includeExclude,
+ QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds,
+ SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory<?> parent,
+ AggregatorFactories.Builder subFactoriesBuilder, String fieldName, String [] sourceFieldNames,
+ boolean filterDuplicateText, Map<String, Object> metaData) throws IOException {
+ super(name, context, parent, subFactoriesBuilder, metaData);
+ this.includeExclude = includeExclude;
+ this.filter = filterBuilder == null
+ ? null
+ : filterBuilder.toQuery(context.getQueryShardContext());
+ this.indexedFieldName = fieldName;
+ this.sourceFieldNames = sourceFieldNames;
+ this.filterDuplicateText = filterDuplicateText;
+ IndexSearcher searcher = context.searcher();
+ // Important - need to use the doc count that includes deleted docs
+ // or we have this issue: https://github.com/elastic/elasticsearch/issues/7951
+ this.supersetNumDocs = filter == null
+ ? searcher.getIndexReader().maxDoc()
+ : searcher.count(filter);
+ this.bucketCountThresholds = bucketCountThresholds;
+ this.significanceHeuristic = significanceHeuristic;
+ fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
+
+ }
+
+
+ /**
+ * Get the number of docs in the superset.
+ */
+ public long getSupersetNumDocs() {
+ return supersetNumDocs;
+ }
+
+ private FilterableTermsEnum getTermsEnum(String field) throws IOException {
+ if (termsEnum != null) {
+ return termsEnum;
+ }
+ IndexReader reader = context.searcher().getIndexReader();
+ if (numberOfAggregatorsCreated > 1) {
+ termsEnum = new FreqTermsEnum(reader, field, true, false, filter, context.bigArrays());
+ } else {
+ termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter);
+ }
+ return termsEnum;
+ }
+
+ private long getBackgroundFrequency(String value) throws IOException {
+ Query query = fieldType.termQuery(value, context.getQueryShardContext());
+ if (query instanceof TermQuery) {
+ // for types that use the inverted index, we prefer using a caching terms
+ // enum that will do a better job at reusing index inputs
+ Term term = ((TermQuery) query).getTerm();
+ FilterableTermsEnum termsEnum = getTermsEnum(term.field());
+ if (termsEnum.seekExact(term.bytes())) {
+ return termsEnum.docFreq();
+ } else {
+ return 0;
+ }
+ }
+ // otherwise do it the naive way
+ if (filter != null) {
+ query = new BooleanQuery.Builder()
+ .add(query, Occur.FILTER)
+ .add(filter, Occur.FILTER)
+ .build();
+ }
+ return context.searcher().count(query);
+ }
+
+ public long getBackgroundFrequency(BytesRef termBytes) throws IOException {
+ String value = format.format(termBytes);
+ return getBackgroundFrequency(value);
+ }
+
+
+ @Override
+ public void close() {
+ try {
+ if (termsEnum instanceof Releasable) {
+ ((Releasable) termsEnum).close();
+ }
+ } finally {
+ termsEnum = null;
+ }
+ }
+
+ @Override
+ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket,
+ List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
+ throws IOException {
+ if (collectsFromSingleBucket == false) {
+ return asMultiBucketAggregator(this, context, parent);
+ }
+
+ numberOfAggregatorsCreated++;
+ BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
+ if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
+ // The user has not made a shardSize selection.
+ // Use default heuristic to avoid any wrong-ranking caused by
+ // distributed counting but request double the usual amount.
+ // We typically need more than the number of "top" terms requested
+ // by other aggregations as the significance algorithm is in less
+ // of a position to down-select at shard-level - some of the things
+ // we want to find have only one occurrence on each shard and as
+ // such are impossible to differentiate from non-significant terms
+ // at that early stage.
+ bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
+ context.numberOfShards()));
+ }
+
+// TODO - need to check with mapping that this is indeed a text field....
+
+ IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null:
+ includeExclude.convertToStringFilter(DocValueFormat.RAW);
+
+ return new SignificantTextAggregator(name, factories, context, parent, pipelineAggregators, bucketCountThresholds,
+ incExcFilter, significanceHeuristic, this, indexedFieldName, sourceFieldNames, filterDuplicateText, metaData);
+
+ }
+}