summaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorNik Everett <nik9000@gmail.com>2016-06-30 15:19:58 -0400
committerNik Everett <nik9000@gmail.com>2016-07-01 09:13:15 -0400
commit91b66e3cf488cc2961bab106de68eae6b2b3cf5c (patch)
tree907664ed6946321464bd90ea4d9d298cfc91f380 /core/src/main/java/org
parent1297a707da7bdc97dfdacb59f23dd1e918c14d63 (diff)
Migration stats and extended stats to NamedWriteable
Migrates the `stats` and `extended_stats` aggregations and pipeline aggregations from the special purpose aggregations streams to `NamedWriteable`. These are the first pipeline aggregations so this adds the infrastructure to support both streams and `NamedWriteable`s for pipeline aggregations.
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java65
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java47
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java91
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java8
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java8
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java59
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java64
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java9
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java50
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java28
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java6
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java28
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java6
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java48
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java28
-rw-r--r--core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java23
16 files changed, 276 insertions, 292 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index cd9d6f8a3f..ee05a72e40 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -91,8 +91,8 @@ import org.elasticsearch.index.query.functionscore.ScriptScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.WeightBuilder;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.action.SearchTransportService;
-import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@@ -119,6 +119,7 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNested;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.ReverseNestedAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeParser;
@@ -129,7 +130,6 @@ import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanc
import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser;
import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
-import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeParser;
import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedSamplerParser;
@@ -200,40 +200,42 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.InternalStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketParser;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.InternalExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
-import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
-import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
@@ -445,16 +447,31 @@ public class SearchModule extends AbstractModule {
* Register a pipeline aggregation.
*
* @param reader reads the aggregation builder from a stream
+ * @param internalReader reads the {@link PipelineAggregator} from a stream
+ * @param internalReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under.
*/
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
+ Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader,
PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
+ if (false == transportClient) {
+ namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
+ pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
+ }
+ namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader);
+ namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader);
+ }
+
+ public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
+ PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
+ // NORELEASE remove me in favor of the above method
pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
}
+
@Override
protected void configure() {
if (false == transportClient) {
@@ -475,9 +492,10 @@ public class SearchModule extends AbstractModule {
registerAggregation(SumAggregationBuilder::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MinAggregationBuilder::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MaxAggregationBuilder::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerAggregation(StatsAggregationBuilder::new, new StatsParser(), StatsAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerAggregation(ExtendedStatsAggregationBuilder::new, new ExtendedStatsParser(),
- ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerAggregation(StatsAggregationBuilder::new, InternalStats::new, new StatsParser(),
+ StatsAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerAggregation(ExtendedStatsAggregationBuilder::new, InternalExtendedStats::new, new ExtendedStatsParser(),
+ ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ValueCountAggregationBuilder::new, new ValueCountParser(), ValueCountAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(PercentilesAggregationBuilder::new, new PercentilesParser(),
PercentilesAggregationBuilder.AGGREGATION_NAME_FIELD);
@@ -522,6 +540,7 @@ public class SearchModule extends AbstractModule {
ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD);
+
registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregationBuilder::parse,
DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
@@ -532,9 +551,11 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER,
SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregationBuilder.PARSER,
+ registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new,
+ InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER,
StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, new ExtendedStatsBucketParser(),
+ registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new,
+ InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(),
ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER,
PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
@@ -703,8 +724,6 @@ public class SearchModule extends AbstractModule {
InternalSum.registerStreams();
InternalMin.registerStreams();
InternalMax.registerStreams();
- InternalStats.registerStreams();
- InternalExtendedStats.registerStreams();
InternalValueCount.registerStreams();
InternalTDigestPercentiles.registerStreams();
InternalTDigestPercentileRanks.registerStreams();
@@ -749,8 +768,6 @@ public class SearchModule extends AbstractModule {
MinBucketPipelineAggregator.registerStreams();
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
- StatsBucketPipelineAggregator.registerStreams();
- ExtendedStatsBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
index b75244ce88..93ecfa975b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
@@ -151,9 +151,13 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- BytesReference type = in.readBytesReference();
- PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
- pipelineAggregators.add(pipelineAggregator);
+ if (in.readBoolean()) {
+ pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
+ } else {
+ BytesReference type = in.readBytesReference();
+ PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
+ pipelineAggregators.add(pipelineAggregator);
+ }
}
}
}
@@ -174,9 +178,13 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- BytesReference type = in.readBytesReference();
- PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
- pipelineAggregators.add(pipelineAggregator);
+ if (in.readBoolean()) {
+ pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
+ } else {
+ BytesReference type = in.readBytesReference();
+ PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
+ pipelineAggregators.add(pipelineAggregator);
+ }
}
}
doReadFrom(in);
@@ -188,12 +196,20 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
@Override
public final void writeTo(StreamOutput out) throws IOException {
- out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
+ out.writeString(name); // NORELEASE remote writing the name? it is automatically handled with writeNamedWriteable
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
- out.writeBytesReference(pipelineAggregator.type().stream());
- pipelineAggregator.writeTo(out);
+ // NORELEASE temporary hack to support old style streams and new style NamedWriteable
+ try {
+ pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
+ out.writeBoolean(true);
+ out.writeNamedWriteable(pipelineAggregator);
+ } catch (UnsupportedOperationException e) {
+ out.writeBoolean(false);
+ out.writeBytesReference(pipelineAggregator.type().stream());
+ pipelineAggregator.writeTo(out);
+ }
}
doWriteTo(out);
}
@@ -201,21 +217,22 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
- public String getName() {
- return name;
- }
-
- @Override
public String getWriteableName() {
// NORELEASE remove me when all InternalAggregations override it
throw new UnsupportedOperationException("Override on every class");
}
+ @Override
+ public String getName() {
+ return name;
+ }
+
/**
* @return The {@link Type} of this aggregation
*/
public Type type() {
- throw new UnsupportedOperationException("Use getWriteableName instead"); // NORELEASE remove me
+ // NORELEASE remove this method
+ throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
}
/**
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java
index cbbb349310..e060826c24 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java
@@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -31,26 +30,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
-/**
-*
-*/
public class InternalStats extends InternalNumericMetricsAggregation.MultiValue implements Stats {
-
- public final static Type TYPE = new Type("stats");
-
- public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
- @Override
- public InternalStats readResult(StreamInput in) throws IOException {
- InternalStats result = new InternalStats();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- AggregationStreams.registerStream(STREAM, TYPE.stream());
- }
-
enum Metrics {
count, sum, min, max, avg;
@@ -60,12 +40,10 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
}
}
- protected long count;
- protected double min;
- protected double max;
- protected double sum;
-
- protected InternalStats() {} // for serialization
+ protected final long count;
+ protected final double min;
+ protected final double max;
+ protected final double sum;
public InternalStats(String name, long count, double sum, double min, double max, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
@@ -78,6 +56,36 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
this.format = formatter;
}
+ /**
+ * Read from a stream.
+ */
+ public InternalStats(StreamInput in) throws IOException {
+ super(in);
+ format = in.readNamedWriteable(DocValueFormat.class);
+ count = in.readVLong();
+ min = in.readDouble();
+ max = in.readDouble();
+ sum = in.readDouble();
+ }
+
+ @Override
+ protected final void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(format);
+ out.writeVLong(count);
+ out.writeDouble(min);
+ out.writeDouble(max);
+ out.writeDouble(sum);
+ writeOtherStatsTo(out);
+ }
+
+ protected void writeOtherStatsTo(StreamOutput out) throws IOException {
+ }
+
+ @Override
+ public String getWriteableName() {
+ return StatsAggregationBuilder.NAME;
+ }
+
@Override
public long getCount() {
return count;
@@ -129,11 +137,6 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
}
@Override
- public Type type() {
- return TYPE;
- }
-
- @Override
public double value(String name) {
Metrics metrics = Metrics.valueOf(name);
switch (metrics) {
@@ -163,32 +166,6 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
return new InternalStats(name, count, sum, min, max, format, pipelineAggregators(), getMetaData());
}
- @Override
- protected void doReadFrom(StreamInput in) throws IOException {
- format = in.readNamedWriteable(DocValueFormat.class);
- count = in.readVLong();
- min = in.readDouble();
- max = in.readDouble();
- sum = in.readDouble();
- readOtherStatsFrom(in);
- }
-
- public void readOtherStatsFrom(StreamInput in) throws IOException {
- }
-
- @Override
- protected void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(format);
- out.writeVLong(count);
- out.writeDouble(min);
- out.writeDouble(max);
- out.writeDouble(sum);
- writeOtherStatsTo(out);
- }
-
- protected void writeOtherStatsTo(StreamOutput out) throws IOException {
- }
-
static class Fields {
public static final String COUNT = "count";
public static final String MIN = "min";
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java
index 4fd80e0449..cac1d92101 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java
@@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
+import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
@@ -36,18 +37,19 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
public class StatsAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, StatsAggregationBuilder> {
- public static final String NAME = InternalStats.TYPE.name();
+ public static final String NAME = "stats";
+ private final static Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public StatsAggregationBuilder(String name) {
- super(name, InternalStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
+ super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public StatsAggregationBuilder(StreamInput in) throws IOException {
- super(in, InternalStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
+ super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java
index a2b961f1fc..a28508c217 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java
@@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
+import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
@@ -38,20 +39,21 @@ import java.util.Objects;
public class ExtendedStatsAggregationBuilder
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, ExtendedStatsAggregationBuilder> {
- public static final String NAME = InternalExtendedStats.TYPE.name();
+ public static final String NAME = "extended_stats";
+ public final static Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private double sigma = 2.0;
public ExtendedStatsAggregationBuilder(String name) {
- super(name, InternalExtendedStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
+ super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public ExtendedStatsAggregationBuilder(StreamInput in) throws IOException {
- super(in, InternalExtendedStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
+ super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
sigma = in.readDouble();
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java
index eff72797b5..d848001171 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java
@@ -18,12 +18,10 @@
*/
package org.elasticsearch.search.aggregations.metrics.stats.extended;
-import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -32,26 +30,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
-/**
-*
-*/
public class InternalExtendedStats extends InternalStats implements ExtendedStats {
-
- public final static Type TYPE = new Type("extended_stats", "estats");
-
- public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
- @Override
- public InternalExtendedStats readResult(StreamInput in) throws IOException {
- InternalExtendedStats result = new InternalExtendedStats();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- AggregationStreams.registerStream(STREAM, TYPE.stream());
- }
-
enum Metrics {
count, sum, min, max, avg, sum_of_squares, variance, std_deviation, std_upper, std_lower;
@@ -61,10 +40,8 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
}
}
- private double sumOfSqrs;
- private double sigma;
-
- protected InternalExtendedStats() {} // for serialization
+ private final double sumOfSqrs;
+ private final double sigma;
public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
@@ -73,9 +50,24 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
this.sigma = sigma;
}
+ /**
+ * Read from a stream.
+ */
+ public InternalExtendedStats(StreamInput in) throws IOException {
+ super(in);
+ sumOfSqrs = in.readDouble();
+ sigma = in.readDouble();
+ }
+
@Override
- public Type type() {
- return TYPE;
+ protected void writeOtherStatsTo(StreamOutput out) throws IOException {
+ out.writeDouble(sumOfSqrs);
+ out.writeDouble(sigma);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ExtendedStatsAggregationBuilder.NAME;
}
@Override
@@ -157,19 +149,6 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
format, pipelineAggregators(), getMetaData());
}
- @Override
- public void readOtherStatsFrom(StreamInput in) throws IOException {
- sumOfSqrs = in.readDouble();
- sigma = in.readDouble();
- }
-
- @Override
- protected void writeOtherStatsTo(StreamOutput out) throws IOException {
- out.writeDouble(sumOfSqrs);
- out.writeDouble(sigma);
- }
-
-
static class Fields {
public static final String SUM_OF_SQRS = "sum_of_squares";
public static final String SUM_OF_SQRS_AS_STRING = "sum_of_squares_as_string";
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java
index 8f7ea955e9..8c43f7460d 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java
@@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@@ -33,7 +34,8 @@ import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import java.io.IOException;
import java.util.Map;
-public abstract class PipelineAggregator implements Streamable {
+public abstract class PipelineAggregator implements Streamable, NamedWriteable {
+ // NORELEASE remove Streamable
/**
* Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}.
@@ -73,25 +75,36 @@ public abstract class PipelineAggregator implements Streamable {
this.metaData = metaData;
}
- public String name() {
- return name;
+ /**
+ * Read from a stream.
+ */
+ protected PipelineAggregator(StreamInput in) throws IOException {
+ name = in.readString();
+ bucketsPaths = in.readStringArray();
+ metaData = in.readMap();
}
- public String[] bucketsPaths() {
- return bucketsPaths;
+ @Override
+ public final void readFrom(StreamInput in) throws IOException {
+ try {
+ getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
+ assert false : "Used reading constructor instead";
+ } catch (UnsupportedOperationException e) {
+ // OK
+ }
+ name = in.readString();
+ bucketsPaths = in.readStringArray();
+ metaData = in.readMap();
+ doReadFrom(in);
}
- public Map<String, Object> metaData() {
- return metaData;
+ protected void doReadFrom(StreamInput in) throws IOException {
+ throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
}
- public abstract Type type();
-
- public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
-
@Override
public final void writeTo(StreamOutput out) throws IOException {
- out.writeString(name);
+ out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
out.writeStringArray(bucketsPaths);
out.writeMap(metaData);
doWriteTo(out);
@@ -100,12 +113,27 @@ public abstract class PipelineAggregator implements Streamable {
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
- public final void readFrom(StreamInput in) throws IOException {
- name = in.readString();
- bucketsPaths = in.readStringArray();
- metaData = in.readMap();
- doReadFrom(in);
+ public String getWriteableName() {
+ // NORELEASE remove me when all InternalAggregations override it
+ throw new UnsupportedOperationException("Override on every class");
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String[] bucketsPaths() {
+ return bucketsPaths;
+ }
+
+ public Map<String, Object> metaData() {
+ return metaData;
}
- protected abstract void doReadFrom(StreamInput in) throws IOException;
+ public Type type() {
+ // NORELEASE remove this method
+ throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
+ }
+
+ public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
index cc4681c903..f69e81fc8b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
@@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.pipeline;
+import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
@@ -27,6 +28,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -43,6 +45,13 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
super(name, bucketsPaths, metaData);
}
+ /**
+ * Read from a stream.
+ */
+ protected SiblingPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ }
+
@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java
index 06b093a473..c09d303f4e 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java
@@ -59,6 +59,35 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
this.format = format;
}
+ /**
+ * Read from a stream.
+ */
+ protected BucketMetricsPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ format = in.readNamedWriteable(DocValueFormat.class);
+ gapPolicy = GapPolicy.readFrom(in);
+ }
+
+ @Override
+ public final void doReadFrom(StreamInput in) throws IOException {
+ format = in.readNamedWriteable(DocValueFormat.class);
+ gapPolicy = GapPolicy.readFrom(in);
+ innerReadFrom(in);
+ }
+
+ protected void innerReadFrom(StreamInput in) throws IOException {
+ }
+
+ @Override
+ public final void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(format);
+ gapPolicy.writeTo(out);
+ innerWriteTo(out);
+ }
+
+ protected void innerWriteTo(StreamOutput out) throws IOException {
+ }
+
@Override
public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
preCollection();
@@ -109,25 +138,4 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
* for this bucket
*/
protected abstract void collectBucketValue(String bucketKey, Double bucketValue);
-
- @Override
- public final void doReadFrom(StreamInput in) throws IOException {
- format = in.readNamedWriteable(DocValueFormat.class);
- gapPolicy = GapPolicy.readFrom(in);
- innerReadFrom(in);
- }
-
- protected void innerReadFrom(StreamInput in) throws IOException {
- }
-
- @Override
- public final void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(format);
- gapPolicy.writeTo(out);
- innerWriteTo(out);
- }
-
- protected void innerWriteTo(StreamOutput out) throws IOException {
- }
-
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java
index 84f8b26575..371e5bf5e8 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java
@@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -31,34 +30,21 @@ import java.util.List;
import java.util.Map;
public class InternalStatsBucket extends InternalStats implements StatsBucket {
-
- public final static Type TYPE = new Type("stats_bucket");
-
- public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
- @Override
- public InternalStatsBucket readResult(StreamInput in) throws IOException {
- InternalStatsBucket result = new InternalStatsBucket();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- AggregationStreams.registerStream(STREAM, TYPE.stream());
- }
-
public InternalStatsBucket(String name, long count, double sum, double min, double max, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, count, sum, min, max, formatter, pipelineAggregators, metaData);
}
- InternalStatsBucket() {
- // For serialization
+ /**
+ * Read from a stream.
+ */
+ public InternalStatsBucket(StreamInput in) throws IOException {
+ super(in);
}
@Override
- public Type type() {
- return TYPE;
+ public String getWriteableName() {
+ return StatsBucketPipelineAggregationBuilder.NAME;
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java
index d3418e897e..9e0432b288 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java
@@ -35,11 +35,11 @@ import java.util.List;
import java.util.Map;
public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<StatsBucketPipelineAggregationBuilder> {
- public static final String NAME = StatsBucketPipelineAggregator.TYPE.name();
+ public static final String NAME = "stats_bucket";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public StatsBucketPipelineAggregationBuilder(String name, String bucketsPath) {
- super(name, StatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+ super(name, NAME, new String[] { bucketsPath });
}
/**
@@ -47,7 +47,7 @@ public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipeline
*/
public StatsBucketPipelineAggregationBuilder(StreamInput in)
throws IOException {
- super(in, StatsBucketPipelineAggregator.TYPE.name());
+ super(in, NAME);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java
index 8be8a8462b..4b1febf444 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java
@@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import java.io.IOException;
@@ -33,23 +31,6 @@ import java.util.List;
import java.util.Map;
public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
- public final static Type TYPE = new Type("stats_bucket");
-
- public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
- @Override
- public StatsBucketPipelineAggregator readResult(StreamInput in) throws IOException {
- StatsBucketPipelineAggregator result = new StatsBucketPipelineAggregator();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
- InternalStatsBucket.registerStreams();
- }
-
private double sum = 0;
private long count = 0;
private double min = Double.POSITIVE_INFINITY;
@@ -60,13 +41,13 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat
super(name, bucketsPaths, gapPolicy, formatter, metaData);
}
- StatsBucketPipelineAggregator() {
- // For serialization
+ public StatsBucketPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
}
@Override
- public Type type() {
- return TYPE;
+ public String getWriteableName() {
+ return StatsBucketPipelineAggregationBuilder.NAME;
}
@Override
@@ -89,5 +70,4 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalStatsBucket(name(), count, sum, min, max, format, pipelineAggregators, metadata);
}
-
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java
index 792b9d1f7f..82920be6cf 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java
@@ -36,20 +36,20 @@ import java.util.Objects;
public class ExtendedStatsBucketPipelineAggregationBuilder
extends BucketMetricsPipelineAggregationBuilder<ExtendedStatsBucketPipelineAggregationBuilder> {
- public static final String NAME = ExtendedStatsBucketPipelineAggregator.TYPE.name();
+ public static final String NAME = "extended_stats_bucket";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private double sigma = 2.0;
public ExtendedStatsBucketPipelineAggregationBuilder(String name, String bucketsPath) {
- super(name, ExtendedStatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+ super(name, NAME, new String[] { bucketsPath });
}
/**
* Read from a stream.
*/
public ExtendedStatsBucketPipelineAggregationBuilder(StreamInput in) throws IOException {
- super(in, ExtendedStatsBucketPipelineAggregator.TYPE.name());
+ super(in, NAME);
sigma = in.readDouble();
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java
index d5e8628d6b..03ed20d494 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java
@@ -23,10 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import java.io.IOException;
@@ -34,29 +32,12 @@ import java.util.List;
import java.util.Map;
public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
- public final static Type TYPE = new Type("extended_stats_bucket");
-
- public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
- @Override
- public ExtendedStatsBucketPipelineAggregator readResult(StreamInput in) throws IOException {
- ExtendedStatsBucketPipelineAggregator result = new ExtendedStatsBucketPipelineAggregator();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
- InternalExtendedStatsBucket.registerStreams();
- }
-
+ private final double sigma;
private double sum = 0;
private long count = 0;
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
private double sumOfSqrs = 1;
- private double sigma;
protected ExtendedStatsBucketPipelineAggregator(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy,
DocValueFormat formatter, Map<String, Object> metaData) {
@@ -64,13 +45,22 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline
this.sigma = sigma;
}
- ExtendedStatsBucketPipelineAggregator() {
- // For Serialization
+ /**
+ * Read from a stream.
+ */
+ public ExtendedStatsBucketPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ sigma = in.readDouble();
+ }
+
+ @Override
+ protected void innerWriteTo(StreamOutput out) throws IOException {
+ out.writeDouble(sigma);
}
@Override
- public Type type() {
- return TYPE;
+ public String getWriteableName() {
+ return ExtendedStatsBucketPipelineAggregationBuilder.NAME;
}
@Override
@@ -95,14 +85,4 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, format, pipelineAggregators, metadata);
}
-
- @Override
- protected void innerReadFrom(StreamInput in) throws IOException {
- sigma = in.readDouble();
- }
-
- @Override
- protected void innerWriteTo(StreamOutput out) throws IOException {
- out.writeDouble(sigma);
- }
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java
index 548e644329..5589a9ebbc 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java
@@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.exten
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -31,35 +30,22 @@ import java.util.List;
import java.util.Map;
public class InternalExtendedStatsBucket extends InternalExtendedStats implements ExtendedStatsBucket {
-
- public final static Type TYPE = new Type("extended_stats_bucket");
-
- public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
- @Override
- public InternalExtendedStatsBucket readResult(StreamInput in) throws IOException {
- InternalExtendedStatsBucket result = new InternalExtendedStatsBucket();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- AggregationStreams.registerStream(STREAM, TYPE.stream());
- }
-
InternalExtendedStatsBucket(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metaData);
}
- InternalExtendedStatsBucket() {
- // for serialization
+ /**
+ * Read from a stream.
+ */
+ public InternalExtendedStatsBucket(StreamInput in) throws IOException {
+ super(in);
}
@Override
- public Type type() {
- return TYPE;
+ public String getWriteableName() {
+ return ExtendedStatsBucketPipelineAggregationBuilder.NAME;
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
index be8c895eec..4fc814a044 100644
--- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
+++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
@@ -224,9 +224,14 @@ public class QuerySearchResult extends QuerySearchResultProvider {
int size = in.readVInt();
List<SiblingPipelineAggregator> pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- BytesReference type = in.readBytesReference();
- PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
- pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
+ // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
+ if (in.readBoolean()) {
+ pipelineAggregators.add((SiblingPipelineAggregator) in.readNamedWriteable(PipelineAggregator.class));
+ } else {
+ BytesReference type = in.readBytesReference();
+ PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
+ pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
+ }
}
this.pipelineAggregators = pipelineAggregators;
}
@@ -273,8 +278,16 @@ public class QuerySearchResult extends QuerySearchResultProvider {
out.writeBoolean(true);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
- out.writeBytesReference(pipelineAggregator.type().stream());
- pipelineAggregator.writeTo(out);
+ // NORELEASE temporary hack to support old style streams and new style NamedWriteable
+ try {
+ pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
+ out.writeBoolean(true);
+ out.writeNamedWriteable(pipelineAggregator);
+ } catch (UnsupportedOperationException e) {
+ out.writeBoolean(false);
+ out.writeBytesReference(pipelineAggregator.type().stream());
+ pipelineAggregator.writeTo(out);
+ }
}
}
if (suggest == null) {