diff options
author | markharwood <markharwood@gmail.com> | 2017-05-24 13:46:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-24 13:46:43 +0100 |
commit | b7197f5e2104e3d67fcd2233264ba39dc4058544 (patch) | |
tree | d829baccc590269a8e4a499aff907c52db5352c2 /core/src/main/java/org/elasticsearch/search | |
parent | b5adb3cce9917dda22135b14778fb38cfcc0d7cb (diff) |
SignificantText aggregation - like significant_terms, but for text (#24432)
* SignificantText aggregation - like significant_terms but doesn’t require fielddata=true, recommended used with `sampler` agg to limit expense of tokenizing docs and takes optional `filter_duplicate_text`:true setting to avoid stats skew from repeated sections of text in search results.
Closes #23674
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search')
6 files changed, 863 insertions, 8 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 85308f0c24..fea834c02a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -132,6 +132,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.UnmappedSampler; import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms; import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.significant.UnmappedSignificantTerms; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND; @@ -377,6 +378,8 @@ public class SearchModule { .addResultReader(SignificantStringTerms.NAME, SignificantStringTerms::new) .addResultReader(SignificantLongTerms.NAME, SignificantLongTerms::new) .addResultReader(UnmappedSignificantTerms.NAME, UnmappedSignificantTerms::new)); + registerAggregation(new AggregationSpec(SignificantTextAggregationBuilder.NAME, SignificantTextAggregationBuilder::new, + SignificantTextAggregationBuilder.getParser(significanceHeuristicParserRegistry))); registerAggregation(new AggregationSpec(RangeAggregationBuilder.NAME, RangeAggregationBuilder::new, RangeAggregationBuilder::parse).addResultReader(InternalRange::new)); registerAggregation(new AggregationSpec(DateRangeAggregationBuilder.NAME, DateRangeAggregationBuilder::new, diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java index 8b704ee8a6..a7a4a5f843 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java @@ -51,6 +51,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.Sampler; import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.avg.Avg; @@ -246,6 +247,15 @@ public class AggregationBuilders { return new SignificantTermsAggregationBuilder(name, null); } + + /** + * Create a new {@link SignificantTextAggregationBuilder} aggregation with the given name and text field name + */ + public static SignificantTextAggregationBuilder significantText(String name, String fieldName) { + return new SignificantTextAggregationBuilder(name, fieldName); + } + + /** * Create a new {@link DateHistogramAggregationBuilder} aggregation with the given * name. diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 4a04fa645b..a9af38ef6e 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -55,7 +55,7 @@ public abstract class AggregatorBase extends Aggregator { private DeferringBucketCollector recordingWrapper; private final List<PipelineAggregator> pipelineAggregators; private final CircuitBreakerService breakerService; - private boolean failed = false; + private long requestBytesUsed; /** * Constructs a new Aggregator. @@ -105,16 +105,31 @@ public abstract class AggregatorBase extends Aggregator { return false; // unreachable } }; + addRequestCircuitBreakerBytes(DEFAULT_WEIGHT); + } + + /** + * Increment the number of bytes that have been allocated to service this request + * and potentially trigger a {@link CircuitBreakingException}. The number of bytes + * allocated is automatically decremented with the circuit breaker service on + * closure of this aggregator. + * For performance reasons subclasses should not call this millions of times + * each with small increments and instead batch up into larger allocations. + * + * @param bytesAllocated the number of additional bytes allocated + * @return the cumulative size in bytes allocated by this aggregator to service this request + */ + protected long addRequestCircuitBreakerBytes(long bytesAllocated) { try { this.breakerService .getBreaker(CircuitBreaker.REQUEST) - .addEstimateBytesAndMaybeBreak(DEFAULT_WEIGHT, "<agg [" + name + "]>"); + .addEstimateBytesAndMaybeBreak(bytesAllocated, "<agg [" + name + "]>"); + this.requestBytesUsed += bytesAllocated; + return requestBytesUsed; } catch (CircuitBreakingException cbe) { - this.failed = true; throw cbe; - } + } } - /** * Most aggregators don't need scores, make sure to extend this method if * your aggregator needs them. @@ -265,9 +280,7 @@ public abstract class AggregatorBase extends Aggregator { try { doClose(); } finally { - if (!this.failed) { - this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-DEFAULT_WEIGHT); - } + this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-this.requestBytesUsed); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java new file mode 100644 index 0000000000..c0fecd8be4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregationBuilder.java @@ -0,0 +1,386 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ParseFieldRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationInitializationException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; +import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class SignificantTextAggregationBuilder extends AbstractAggregationBuilder<SignificantTextAggregationBuilder> { + public static final String NAME = "significant_text"; + + static final ParseField FIELD_NAME = new ParseField("field"); + static final ParseField SOURCE_FIELDS_NAME = new ParseField("source_fields"); + static final ParseField FILTER_DUPLICATE_TEXT_FIELD_NAME = new ParseField( + "filter_duplicate_text"); + + static final TermsAggregator.BucketCountThresholds DEFAULT_BUCKET_COUNT_THRESHOLDS = + SignificantTermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS; + static final SignificanceHeuristic DEFAULT_SIGNIFICANCE_HEURISTIC = SignificantTermsAggregationBuilder.DEFAULT_SIGNIFICANCE_HEURISTIC; + + private String fieldName = null; + private String [] sourceFieldNames = null; + private boolean filterDuplicateText = false; + private IncludeExclude includeExclude = null; + private QueryBuilder filterBuilder = null; + private TermsAggregator.BucketCountThresholds bucketCountThresholds = new BucketCountThresholds( + DEFAULT_BUCKET_COUNT_THRESHOLDS); + private SignificanceHeuristic significanceHeuristic = DEFAULT_SIGNIFICANCE_HEURISTIC; + + public static Aggregator.Parser getParser( + ParseFieldRegistry<SignificanceHeuristicParser> significanceHeuristicParserRegistry) { + ObjectParser<SignificantTextAggregationBuilder, QueryParseContext> parser = new ObjectParser<>( + SignificantTextAggregationBuilder.NAME); + + parser.declareInt(SignificantTextAggregationBuilder::shardSize, + TermsAggregationBuilder.SHARD_SIZE_FIELD_NAME); + + parser.declareLong(SignificantTextAggregationBuilder::minDocCount, + TermsAggregationBuilder.MIN_DOC_COUNT_FIELD_NAME); + + parser.declareLong(SignificantTextAggregationBuilder::shardMinDocCount, + TermsAggregationBuilder.SHARD_MIN_DOC_COUNT_FIELD_NAME); + + parser.declareInt(SignificantTextAggregationBuilder::size, + TermsAggregationBuilder.REQUIRED_SIZE_FIELD_NAME); + + parser.declareString(SignificantTextAggregationBuilder::fieldName, FIELD_NAME); + + parser.declareStringArray(SignificantTextAggregationBuilder::sourceFieldNames, SOURCE_FIELDS_NAME); + + + parser.declareBoolean(SignificantTextAggregationBuilder::filterDuplicateText, + FILTER_DUPLICATE_TEXT_FIELD_NAME); + + parser.declareObject(SignificantTextAggregationBuilder::backgroundFilter, + (p, context) -> context.parseInnerQueryBuilder(), + SignificantTermsAggregationBuilder.BACKGROUND_FILTER); + + parser.declareField((b, v) -> b.includeExclude(IncludeExclude.merge(v, b.includeExclude())), + IncludeExclude::parseInclude, IncludeExclude.INCLUDE_FIELD, + ObjectParser.ValueType.OBJECT_ARRAY_OR_STRING); + + parser.declareField((b, v) -> b.includeExclude(IncludeExclude.merge(b.includeExclude(), v)), + IncludeExclude::parseExclude, IncludeExclude.EXCLUDE_FIELD, + ObjectParser.ValueType.STRING_ARRAY); + + for (String name : significanceHeuristicParserRegistry.getNames()) { + parser.declareObject(SignificantTextAggregationBuilder::significanceHeuristic, + (p, context) -> { + SignificanceHeuristicParser significanceHeuristicParser = significanceHeuristicParserRegistry + .lookupReturningNullIfNotFound(name); + return significanceHeuristicParser.parse(context); + }, new ParseField(name)); + } + return new Aggregator.Parser() { + @Override + public AggregationBuilder parse(String aggregationName, QueryParseContext context) + throws IOException { + return parser.parse(context.parser(), + new SignificantTextAggregationBuilder(aggregationName, null), context); + } + }; + } + + protected TermsAggregator.BucketCountThresholds getBucketCountThresholds() { + return new TermsAggregator.BucketCountThresholds(bucketCountThresholds); + } + + public TermsAggregator.BucketCountThresholds bucketCountThresholds() { + return bucketCountThresholds; + } + + + @Override + public SignificantTextAggregationBuilder subAggregations(Builder subFactories) { + throw new AggregationInitializationException("Aggregator [" + name + "] of type [" + + getType() + "] cannot accept sub-aggregations"); + } + + @Override + public SignificantTextAggregationBuilder subAggregation(AggregationBuilder aggregation) { + throw new AggregationInitializationException("Aggregator [" + name + "] of type [" + + getType() + "] cannot accept sub-aggregations"); + } + + public SignificantTextAggregationBuilder bucketCountThresholds( + TermsAggregator.BucketCountThresholds bucketCountThresholds) { + if (bucketCountThresholds == null) { + throw new IllegalArgumentException( + "[bucketCountThresholds] must not be null: [" + name + "]"); + } + this.bucketCountThresholds = bucketCountThresholds; + return this; + } + + /** + * Sets the size - indicating how many term buckets should be returned + * (defaults to 10) + */ + public SignificantTextAggregationBuilder size(int size) { + if (size <= 0) { + throw new IllegalArgumentException( + "[size] must be greater than 0. Found [" + size + "] in [" + name + "]"); + } + bucketCountThresholds.setRequiredSize(size); + return this; + } + + /** + * Sets the shard_size - indicating the number of term buckets each shard + * will return to the coordinating node (the node that coordinates the + * search execution). The higher the shard size is, the more accurate the + * results are. + */ + public SignificantTextAggregationBuilder shardSize(int shardSize) { + if (shardSize <= 0) { + throw new IllegalArgumentException("[shardSize] must be greater than 0. Found [" + + shardSize + "] in [" + name + "]"); + } + bucketCountThresholds.setShardSize(shardSize); + return this; + } + + /** + * Sets the name of the text field that will be the subject of this + * aggregation. + */ + public SignificantTextAggregationBuilder fieldName(String fieldName) { + this.fieldName = fieldName; + return this; + } + + + /** + * Selects the fields to load from _source JSON and analyze. + * If none are specified, the indexed "fieldName" value is assumed + * to also be the name of the JSON field holding the value + */ + public SignificantTextAggregationBuilder sourceFieldNames(List<String> names) { + this.sourceFieldNames = names.toArray(new String [names.size()]); + return this; + } + + + /** + * Control if duplicate paragraphs of text should try be filtered from the + * statistical text analysis. Can improve results but slows down analysis. + * Default is false. + */ + public SignificantTextAggregationBuilder filterDuplicateText(boolean filterDuplicateText) { + this.filterDuplicateText = filterDuplicateText; + return this; + } + + /** + * Set the minimum document count terms should have in order to appear in + * the response. + */ + public SignificantTextAggregationBuilder minDocCount(long minDocCount) { + if (minDocCount < 0) { + throw new IllegalArgumentException( + "[minDocCount] must be greater than or equal to 0. Found [" + minDocCount + + "] in [" + name + "]"); + } + bucketCountThresholds.setMinDocCount(minDocCount); + return this; + } + + /** + * Set the minimum document count terms should have on the shard in order to + * appear in the response. + */ + public SignificantTextAggregationBuilder shardMinDocCount(long shardMinDocCount) { + if (shardMinDocCount < 0) { + throw new IllegalArgumentException( + "[shardMinDocCount] must be greater than or equal to 0. Found [" + + shardMinDocCount + "] in [" + name + "]"); + } + bucketCountThresholds.setShardMinDocCount(shardMinDocCount); + return this; + } + + public SignificantTextAggregationBuilder backgroundFilter(QueryBuilder backgroundFilter) { + if (backgroundFilter == null) { + throw new IllegalArgumentException( + "[backgroundFilter] must not be null: [" + name + "]"); + } + this.filterBuilder = backgroundFilter; + return this; + } + + public QueryBuilder backgroundFilter() { + return filterBuilder; + } + + /** + * Set terms to include and exclude from the aggregation results + */ + public SignificantTextAggregationBuilder includeExclude(IncludeExclude includeExclude) { + this.includeExclude = includeExclude; + return this; + } + + /** + * Get terms to include and exclude from the aggregation results + */ + public IncludeExclude includeExclude() { + return includeExclude; + } + + public SignificantTextAggregationBuilder significanceHeuristic( + SignificanceHeuristic significanceHeuristic) { + if (significanceHeuristic == null) { + throw new IllegalArgumentException( + "[significanceHeuristic] must not be null: [" + name + "]"); + } + this.significanceHeuristic = significanceHeuristic; + return this; + } + + public SignificanceHeuristic significanceHeuristic() { + return significanceHeuristic; + } + + /** + * @param name + * the name of this aggregation + * @param fieldName + * the name of the text field that will be the subject of this + * aggregation + * + */ + public SignificantTextAggregationBuilder(String name, String fieldName) { + super(name); + this.fieldName = fieldName; + } + + /** + * Read from a stream. + */ + public SignificantTextAggregationBuilder(StreamInput in) throws IOException { + super(in); + fieldName = in.readString(); + filterDuplicateText = in.readBoolean(); + bucketCountThresholds = new BucketCountThresholds(in); + filterBuilder = in.readOptionalNamedWriteable(QueryBuilder.class); + includeExclude = in.readOptionalWriteable(IncludeExclude::new); + significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class); + sourceFieldNames = in.readOptionalStringArray(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(fieldName); + out.writeBoolean(filterDuplicateText); + bucketCountThresholds.writeTo(out); + out.writeOptionalNamedWriteable(filterBuilder); + out.writeOptionalWriteable(includeExclude); + out.writeNamedWriteable(significanceHeuristic); + out.writeOptionalStringArray(sourceFieldNames); + } + + @Override + protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<?> parent, + Builder subFactoriesBuilder) throws IOException { + SignificanceHeuristic executionHeuristic = this.significanceHeuristic.rewrite(context); + String[] execFieldNames = sourceFieldNames; + if (execFieldNames == null) { + execFieldNames = new String[] { fieldName }; + } + return new SignificantTextAggregatorFactory(name, includeExclude, filterBuilder, + bucketCountThresholds, executionHeuristic, context, parent, subFactoriesBuilder, + fieldName, execFieldNames, filterDuplicateText, metaData); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) + throws IOException { + builder.startObject(); + bucketCountThresholds.toXContent(builder, params); + if (fieldName != null) { + builder.field(FIELD_NAME.getPreferredName(), fieldName); + } + if (sourceFieldNames != null) { + builder.array(SOURCE_FIELDS_NAME.getPreferredName(), sourceFieldNames); + } + + if (filterDuplicateText) { + builder.field(FILTER_DUPLICATE_TEXT_FIELD_NAME.getPreferredName(), filterDuplicateText); + } + if (filterBuilder != null) { + builder.field(SignificantTermsAggregationBuilder.BACKGROUND_FILTER.getPreferredName(), + filterBuilder); + } + if (includeExclude != null) { + includeExclude.toXContent(builder, params); + } + significanceHeuristic.toXContent(builder, params); + + builder.endObject(); + return builder; + } + + @Override + protected int doHashCode() { + return Objects.hash(bucketCountThresholds, fieldName, filterDuplicateText, filterBuilder, + includeExclude, significanceHeuristic, Arrays.hashCode(sourceFieldNames)); + } + + @Override + protected boolean doEquals(Object obj) { + SignificantTextAggregationBuilder other = (SignificantTextAggregationBuilder) obj; + return Objects.equals(bucketCountThresholds, other.bucketCountThresholds) + && Objects.equals(fieldName, other.fieldName) + && Arrays.equals(sourceFieldNames, other.sourceFieldNames) + && filterDuplicateText == other.filterDuplicateText + && Objects.equals(filterBuilder, other.filterBuilder) + && Objects.equals(includeExclude, other.includeExclude) + && Objects.equals(significanceHeuristic, other.significanceHeuristic); + } + + @Override + public String getType() { + return NAME; + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java new file mode 100644 index 0000000000..c7539a4ca0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java @@ -0,0 +1,256 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter; +import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.highlight.TokenStreamFromTermVector; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude.StringFilter; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; + +public class SignificantTextAggregator extends BucketsAggregator { + + private final StringFilter includeExclude; + protected final BucketCountThresholds bucketCountThresholds; + protected long numCollectedDocs; + private final BytesRefHash bucketOrds; + private final SignificanceHeuristic significanceHeuristic; + private SignificantTextAggregatorFactory termsAggFactory; + private final DocValueFormat format = DocValueFormat.RAW; + private final String fieldName; + private final String[] sourceFieldNames; + private DuplicateByteSequenceSpotter dupSequenceSpotter = null ; + private long lastTrieSize; + private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000; + + + + public SignificantTextAggregator(String name, AggregatorFactories factories, + SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators, + BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude, + SignificanceHeuristic significanceHeuristic, SignificantTextAggregatorFactory termsAggFactory, + String fieldName, String [] sourceFieldNames, boolean filterDuplicateText, + Map<String, Object> metaData) throws IOException { + super(name, factories, context, parent, pipelineAggregators, metaData); + this.bucketCountThresholds = bucketCountThresholds; + this.includeExclude = includeExclude; + this.significanceHeuristic = significanceHeuristic; + this.termsAggFactory = termsAggFactory; + this.fieldName = fieldName; + this.sourceFieldNames = sourceFieldNames; + bucketOrds = new BytesRefHash(1, context.bigArrays()); + if(filterDuplicateText){ + dupSequenceSpotter = new DuplicateByteSequenceSpotter(); + lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes(); + } + } + + + + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + final BytesRefBuilder previous = new BytesRefBuilder(); + return new LeafBucketCollectorBase(sub, null) { + + @Override + public void collect(int doc, long bucket) throws IOException { + collectFromSource(doc, bucket, fieldName, sourceFieldNames); + numCollectedDocs++; + if (dupSequenceSpotter != null) { + dupSequenceSpotter.startNewSequence(); + } + } + + private void processTokenStream(int doc, long bucket, TokenStream ts, String fieldText) throws IOException{ + if (dupSequenceSpotter != null) { + ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter); + } + CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class); + ts.reset(); + try { + //Assume tokens will average 5 bytes in length to size number of tokens + BytesRefHash inDocTerms = new BytesRefHash(1+(fieldText.length()/5), context.bigArrays()); + + try{ + while (ts.incrementToken()) { + if (dupSequenceSpotter != null) { + long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes(); + long growth = newTrieSize - lastTrieSize; + // Only update the circuitbreaker after + if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) { + addRequestCircuitBreakerBytes(growth); + lastTrieSize = newTrieSize; + } + } + previous.clear(); + previous.copyChars(termAtt); + BytesRef bytes = previous.get(); + if (inDocTerms.add(bytes) >= 0) { + if (includeExclude == null || includeExclude.accept(bytes)) { + long bucketOrdinal = bucketOrds.add(bytes); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + collectExistingBucket(sub, doc, bucketOrdinal); + } else { + collectBucket(sub, doc, bucketOrdinal); + } + } + } + } + } finally{ + Releasables.close(inDocTerms); + } + } finally{ + ts.close(); + } + } + + private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException { + MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName); + if(fieldType == null){ + throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName + +"] since it is not present"); + } + + SourceLookup sourceLookup = context.lookup().source(); + sourceLookup.setSegmentAndDocument(ctx, doc); + + for (String sourceField : sourceFieldNames) { + List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField); + textsToHighlight = textsToHighlight.stream().map(obj -> { + if (obj instanceof BytesRef) { + return fieldType.valueForDisplay(obj).toString(); + } else { + return obj; + } + }).collect(Collectors.toList()); + + Analyzer analyzer = fieldType.indexAnalyzer(); + for (Object fieldValue : textsToHighlight) { + String fieldText = fieldValue.toString(); + TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText); + processTokenStream(doc, bucket, ts, fieldText); + } + } + } + }; + } + + @Override + public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException { + assert owningBucketOrdinal == 0; + + final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); + long supersetSize = termsAggFactory.getSupersetNumDocs(); + long subsetSize = numCollectedDocs; + + BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size); + SignificantStringTerms.Bucket spare = null; + for (int i = 0; i < bucketOrds.size(); i++) { + final int docCount = bucketDocCount(i); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + + if (spare == null) { + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format); + } + + bucketOrds.get(i, spare.termBytes); + spare.subsetDf = docCount; + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); + spare.supersetSize = supersetSize; + // During shard-local down-selection we use subset/superset stats + // that are for this shard only + // Back at the central reducer these properties will be updated with + // global stats + spare.updateScore(significanceHeuristic); + + spare.bucketOrd = i; + spare = ordered.insertWithOverflow(spare); + } + + final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; i--) { + final SignificantStringTerms.Bucket bucket = ordered.pop(); + // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point + bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); + bucket.aggregations = bucketAggregations(bucket.bucketOrd); + list[i] = bucket; + } + + return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), pipelineAggregators(), + metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list)); + } + + + @Override + public SignificantStringTerms buildEmptyAggregation() { + // We need to account for the significance of a miss in our global stats - provide corpus size as context + ContextIndexSearcher searcher = context.searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), + pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList()); + } + + @Override + public void doClose() { + Releasables.close(bucketOrds, termsAggFactory); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java new file mode 100644 index 0000000000..629f698994 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java @@ -0,0 +1,187 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lucene.index.FilterableTermsEnum; +import org.elasticsearch.common.lucene.index.FreqTermsEnum; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.BucketUtils; +import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class SignificantTextAggregatorFactory extends AggregatorFactory<SignificantTextAggregatorFactory> + implements Releasable { + + private final IncludeExclude includeExclude; + private String indexedFieldName; + private MappedFieldType fieldType; + private final String[] sourceFieldNames; + private FilterableTermsEnum termsEnum; + private int numberOfAggregatorsCreated; + private final Query filter; + private final int supersetNumDocs; + private final TermsAggregator.BucketCountThresholds bucketCountThresholds; + private final SignificanceHeuristic significanceHeuristic; + private final DocValueFormat format = DocValueFormat.RAW; + private final boolean filterDuplicateText; + + public SignificantTextAggregatorFactory(String name, IncludeExclude includeExclude, + QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds, + SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory<?> parent, + AggregatorFactories.Builder subFactoriesBuilder, String fieldName, String [] sourceFieldNames, + boolean filterDuplicateText, Map<String, Object> metaData) throws IOException { + super(name, context, parent, subFactoriesBuilder, metaData); + this.includeExclude = includeExclude; + this.filter = filterBuilder == null + ? null + : filterBuilder.toQuery(context.getQueryShardContext()); + this.indexedFieldName = fieldName; + this.sourceFieldNames = sourceFieldNames; + this.filterDuplicateText = filterDuplicateText; + IndexSearcher searcher = context.searcher(); + // Important - need to use the doc count that includes deleted docs + // or we have this issue: https://github.com/elastic/elasticsearch/issues/7951 + this.supersetNumDocs = filter == null + ? searcher.getIndexReader().maxDoc() + : searcher.count(filter); + this.bucketCountThresholds = bucketCountThresholds; + this.significanceHeuristic = significanceHeuristic; + fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName); + + } + + + /** + * Get the number of docs in the superset. + */ + public long getSupersetNumDocs() { + return supersetNumDocs; + } + + private FilterableTermsEnum getTermsEnum(String field) throws IOException { + if (termsEnum != null) { + return termsEnum; + } + IndexReader reader = context.searcher().getIndexReader(); + if (numberOfAggregatorsCreated > 1) { + termsEnum = new FreqTermsEnum(reader, field, true, false, filter, context.bigArrays()); + } else { + termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter); + } + return termsEnum; + } + + private long getBackgroundFrequency(String value) throws IOException { + Query query = fieldType.termQuery(value, context.getQueryShardContext()); + if (query instanceof TermQuery) { + // for types that use the inverted index, we prefer using a caching terms + // enum that will do a better job at reusing index inputs + Term term = ((TermQuery) query).getTerm(); + FilterableTermsEnum termsEnum = getTermsEnum(term.field()); + if (termsEnum.seekExact(term.bytes())) { + return termsEnum.docFreq(); + } else { + return 0; + } + } + // otherwise do it the naive way + if (filter != null) { + query = new BooleanQuery.Builder() + .add(query, Occur.FILTER) + .add(filter, Occur.FILTER) + .build(); + } + return context.searcher().count(query); + } + + public long getBackgroundFrequency(BytesRef termBytes) throws IOException { + String value = format.format(termBytes); + return getBackgroundFrequency(value); + } + + + @Override + public void close() { + try { + if (termsEnum instanceof Releasable) { + ((Releasable) termsEnum).close(); + } + } finally { + termsEnum = null; + } + } + + @Override + protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, + List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) + throws IOException { + if (collectsFromSingleBucket == false) { + return asMultiBucketAggregator(this, context, parent); + } + + numberOfAggregatorsCreated++; + BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds); + if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) { + // The user has not made a shardSize selection. + // Use default heuristic to avoid any wrong-ranking caused by + // distributed counting but request double the usual amount. + // We typically need more than the number of "top" terms requested + // by other aggregations as the significance algorithm is in less + // of a position to down-select at shard-level - some of the things + // we want to find have only one occurrence on each shard and as + // such are impossible to differentiate from non-significant terms + // at that early stage. + bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), + context.numberOfShards())); + } + +// TODO - need to check with mapping that this is indeed a text field.... + + IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null: + includeExclude.convertToStringFilter(DocValueFormat.RAW); + + return new SignificantTextAggregator(name, factories, context, parent, pipelineAggregators, bucketCountThresholds, + incExcFilter, significanceHeuristic, this, indexedFieldName, sourceFieldNames, filterDuplicateText, metaData); + + } +} |