summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorColin Goodheart-Smithe <colings86@users.noreply.github.com>2016-08-25 09:35:23 +0100
committerColin Goodheart-Smithe <colings86@users.noreply.github.com>2016-08-25 14:57:52 +0100
commitf5fbb3eb8b091ee61101b77cbf208bfeb0766c4e (patch)
tree2c2c641664998e80c750a7be0b7e143c34ff0832
parentb521638f52b359de7a25f5d7719e0ec9e87d3cad (diff)
Fix agg profiling when using breadth_first collect mode
Previous to this change the nesting of aggregation profiling results would be incorrect when the request contains a terms aggregation and the collect mode is (implicitly or explicitly) set to `breadth_first`. This was because the aggregation profiling has to make the assumption that the `preCollection()` method of children aggregations is always called in the `preCollection()` method of their parent aggregation. When the collect mode is `breadth_first` the `preCollection` of the children aggregations was delayed until the documents were replayed. This change moves the `preCollection()` of deferred aggregations to run during the `preCollection()` of the parent aggregation. This should have no adverse impact on the breadth_first mode as there is no allocation of memory in any of the aggregations. We also apply the same logic to the diversified sampler aggregation as we did to the terms aggregation to move the `preCollection()` of the child aggregations method to be called during the `preCollection()` of the parent aggregation. This commit also includes a fix so that the `ProfilingLeafBucketCollector` propagates the scorer to its delegate so the diversified sampler agg works when profiling is enabled.
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java12
-rw-r--r--core/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingLeafBucketCollector.java6
-rw-r--r--core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java126
4 files changed, 139 insertions, 9 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java
index b7cab31974..7c6ebae740 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java
@@ -25,11 +25,9 @@ import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
-import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -119,6 +117,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
@Override
public void preCollection() throws IOException {
+ collector.preCollection();
}
@Override
@@ -145,7 +144,6 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
}
this.selectedBuckets = hash;
- collector.preCollection();
boolean needsScores = collector.needsScores();
Weight weight = null;
if (needsScores) {
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java
index 2336923467..90316c1a00 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java
@@ -48,7 +48,7 @@ import java.util.List;
* {@link BestDocsDeferringCollector#createTopDocsCollector(int)} is designed to
* be overridden and allows subclasses to choose a custom collector
* implementation for determining the top N matches.
- *
+ *
*/
public class BestDocsDeferringCollector extends DeferringBucketCollector implements Releasable {
@@ -61,7 +61,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
/**
* Sole constructor.
- *
+ *
* @param shardSize
* The number of top-scoring docs to collect for each bucket
*/
@@ -111,6 +111,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
@Override
public void preCollection() throws IOException {
+ deferred.preCollection();
}
@Override
@@ -125,7 +126,6 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
}
private void runDeferredAggs() throws IOException {
- deferred.preCollection();
List<ScoreDoc> allDocs = new ArrayList<>(shardSize);
for (int i = 0; i < perBucketSamples.size(); i++) {
@@ -135,14 +135,14 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
}
perBucketSample.getMatches(allDocs);
}
-
+
// Sort the top matches by docID for the benefit of deferred collector
ScoreDoc[] docsArr = allDocs.toArray(new ScoreDoc[allDocs.size()]);
Arrays.sort(docsArr, new Comparator<ScoreDoc>() {
@Override
public int compare(ScoreDoc o1, ScoreDoc o2) {
if(o1.doc == o2.doc){
- return o1.shardIndex - o2.shardIndex;
+ return o1.shardIndex - o2.shardIndex;
}
return o1.doc - o2.doc;
}
@@ -256,7 +256,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
currentScore = scoreDoc.score;
currentDocId = rebased;
// We stored the bucket ID in Lucene's shardIndex property
- // for convenience.
+ // for convenience.
leafCollector.collect(rebased, scoreDoc.shardIndex);
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingLeafBucketCollector.java b/core/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingLeafBucketCollector.java
index 75c90ded70..addf910bc5 100644
--- a/core/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingLeafBucketCollector.java
+++ b/core/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingLeafBucketCollector.java
@@ -19,6 +19,7 @@
package org.elasticsearch.search.profile.aggregation;
+import org.apache.lucene.search.Scorer;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
@@ -40,4 +41,9 @@ public class ProfilingLeafBucketCollector extends LeafBucketCollector {
profileBreakdown.stopAndRecordTime();
}
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {
+ delegate.setScorer(scorer);
+ }
+
}
diff --git a/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java b/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java
index f245629a28..342da16f50 100644
--- a/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java
+++ b/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java
@@ -21,6 +21,8 @@ package org.elasticsearch.search.profile.aggregation;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
+import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedOrdinalsSamplerAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregator;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregator;
@@ -37,6 +39,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
@@ -187,6 +190,129 @@ public class AggregationProfilerIT extends ESIntegTestCase {
}
}
+ public void testMultiLevelProfileBreadthFirst() {
+ SearchResponse response = client().prepareSearch("idx").setProfile(true)
+ .addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L).subAggregation(terms("terms")
+ .collectMode(SubAggCollectionMode.BREADTH_FIRST).field(TAG_FIELD).subAggregation(avg("avg").field(NUMBER_FIELD))))
+ .get();
+ assertSearchResponse(response);
+ Map<String, ProfileShardResult> profileResults = response.getProfileResults();
+ assertThat(profileResults, notNullValue());
+ assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
+ for (ProfileShardResult profileShardResult : profileResults.values()) {
+ assertThat(profileShardResult, notNullValue());
+ AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
+ assertThat(aggProfileResults, notNullValue());
+ List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
+ assertThat(aggProfileResultsList, notNullValue());
+ assertThat(aggProfileResultsList.size(), equalTo(1));
+ ProfileResult histoAggResult = aggProfileResultsList.get(0);
+ assertThat(histoAggResult, notNullValue());
+ assertThat(histoAggResult.getQueryName(),
+ equalTo("org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator"));
+ assertThat(histoAggResult.getLuceneDescription(), equalTo("histo"));
+ assertThat(histoAggResult.getTime(), greaterThan(0L));
+ Map<String, Long> histoBreakdown = histoAggResult.getTimeBreakdown();
+ assertThat(histoBreakdown, notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
+ assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
+ assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
+ assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
+ assertThat(histoAggResult.getProfiledChildren().size(), equalTo(1));
+
+ ProfileResult termsAggResult = histoAggResult.getProfiledChildren().get(0);
+ assertThat(termsAggResult, notNullValue());
+ assertThat(termsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName()));
+ assertThat(termsAggResult.getLuceneDescription(), equalTo("terms"));
+ assertThat(termsAggResult.getTime(), greaterThan(0L));
+ Map<String, Long> termsBreakdown = termsAggResult.getTimeBreakdown();
+ assertThat(termsBreakdown, notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
+ assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
+ assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
+ assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
+ assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1));
+
+ ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0);
+ assertThat(avgAggResult, notNullValue());
+ assertThat(avgAggResult.getQueryName(), equalTo(AvgAggregator.class.getName()));
+ assertThat(avgAggResult.getLuceneDescription(), equalTo("avg"));
+ assertThat(avgAggResult.getTime(), greaterThan(0L));
+ Map<String, Long> avgBreakdown = termsAggResult.getTimeBreakdown();
+ assertThat(avgBreakdown, notNullValue());
+ assertThat(avgBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
+ assertThat(avgBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
+ assertThat(avgBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
+ assertThat(avgBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
+ assertThat(avgBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
+ assertThat(avgBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
+ assertThat(avgBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
+ assertThat(avgBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
+ assertThat(avgAggResult.getProfiledChildren().size(), equalTo(0));
+ }
+ }
+
+ public void testDiversifiedAggProfile() {
+ SearchResponse response = client().prepareSearch("idx").setProfile(true)
+ .addAggregation(diversifiedSampler("diversify").shardSize(10).field(STRING_FIELD).maxDocsPerValue(2)
+ .subAggregation(max("max").field(NUMBER_FIELD)))
+ .get();
+ assertSearchResponse(response);
+ Map<String, ProfileShardResult> profileResults = response.getProfileResults();
+ assertThat(profileResults, notNullValue());
+ assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
+ for (ProfileShardResult profileShardResult : profileResults.values()) {
+ assertThat(profileShardResult, notNullValue());
+ AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
+ assertThat(aggProfileResults, notNullValue());
+ List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
+ assertThat(aggProfileResultsList, notNullValue());
+ assertThat(aggProfileResultsList.size(), equalTo(1));
+ ProfileResult diversifyAggResult = aggProfileResultsList.get(0);
+ assertThat(diversifyAggResult, notNullValue());
+ assertThat(diversifyAggResult.getQueryName(),
+ equalTo(DiversifiedOrdinalsSamplerAggregator.class.getName()));
+ assertThat(diversifyAggResult.getLuceneDescription(), equalTo("diversify"));
+ assertThat(diversifyAggResult.getTime(), greaterThan(0L));
+ Map<String, Long> histoBreakdown = diversifyAggResult.getTimeBreakdown();
+ assertThat(histoBreakdown, notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
+ assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
+ assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
+ assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
+ assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
+ assertThat(diversifyAggResult.getProfiledChildren().size(), equalTo(1));
+
+ ProfileResult maxAggResult = diversifyAggResult.getProfiledChildren().get(0);
+ assertThat(maxAggResult, notNullValue());
+ assertThat(maxAggResult.getQueryName(), equalTo(MaxAggregator.class.getName()));
+ assertThat(maxAggResult.getLuceneDescription(), equalTo("max"));
+ assertThat(maxAggResult.getTime(), greaterThan(0L));
+ Map<String, Long> termsBreakdown = maxAggResult.getTimeBreakdown();
+ assertThat(termsBreakdown, notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
+ assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
+ assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
+ assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
+ assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
+ assertThat(maxAggResult.getProfiledChildren().size(), equalTo(0));
+ }
+ }
+
public void testComplexProfile() {
SearchResponse response = client().prepareSearch("idx").setProfile(true)
.addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L)