diff options
author | Nik Everett <nik9000@gmail.com> | 2016-06-30 15:19:58 -0400 |
---|---|---|
committer | Nik Everett <nik9000@gmail.com> | 2016-07-01 09:13:15 -0400 |
commit | 91b66e3cf488cc2961bab106de68eae6b2b3cf5c (patch) | |
tree | 907664ed6946321464bd90ea4d9d298cfc91f380 /core | |
parent | 1297a707da7bdc97dfdacb59f23dd1e918c14d63 (diff) |
Migration stats and extended stats to NamedWriteable
Migrates the `stats` and `extended_stats` aggregations and pipeline
aggregations from the special purpose aggregations streams to
`NamedWriteable`. These are the first pipeline aggregations so this
adds the infrastructure to support both streams and `NamedWriteable`s
for pipeline aggregations.
Diffstat (limited to 'core')
17 files changed, 277 insertions, 294 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index cd9d6f8a3f..ee05a72e40 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -91,8 +91,8 @@ import org.elasticsearch.index.query.functionscore.ScriptScoreFunctionBuilder; import org.elasticsearch.index.query.functionscore.WeightBuilder; import org.elasticsearch.indices.query.IndicesQueriesRegistry; import org.elasticsearch.search.action.SearchTransportService; -import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorParsers; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; @@ -119,6 +119,7 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalNested; import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNested; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.nested.ReverseNestedAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange; import org.elasticsearch.search.aggregations.bucket.range.InternalRange; import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.RangeParser; @@ -129,7 +130,6 @@ import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanc import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser; import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance; import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange; import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeParser; import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedSamplerParser; @@ -200,40 +200,42 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.InternalStatsBucket; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketParser; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.InternalExtendedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; -import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSubPhase; @@ -445,16 +447,31 @@ public class SearchModule extends AbstractModule { * Register a pipeline aggregation. * * @param reader reads the aggregation builder from a stream + * @param internalReader reads the {@link PipelineAggregator} from a stream + * @param internalReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream * @param aggregationParser reads the aggregation builder from XContent * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader * is registered under. */ public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader, + Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader, PipelineAggregator.Parser aggregationParser, ParseField aggregationName) { + if (false == transportClient) { + namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader); + pipelineAggregationParserRegistry.register(aggregationParser, aggregationName); + } + namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader); + namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader); + } + + public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader, + PipelineAggregator.Parser aggregationParser, ParseField aggregationName) { + // NORELEASE remove me in favor of the above method pipelineAggregationParserRegistry.register(aggregationParser, aggregationName); namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader); } + @Override protected void configure() { if (false == transportClient) { @@ -475,9 +492,10 @@ public class SearchModule extends AbstractModule { registerAggregation(SumAggregationBuilder::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(MinAggregationBuilder::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(MaxAggregationBuilder::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD); - registerAggregation(StatsAggregationBuilder::new, new StatsParser(), StatsAggregationBuilder.AGGREGATION_NAME_FIELD); - registerAggregation(ExtendedStatsAggregationBuilder::new, new ExtendedStatsParser(), - ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD); + registerAggregation(StatsAggregationBuilder::new, InternalStats::new, new StatsParser(), + StatsAggregationBuilder.AGGREGATION_NAME_FIELD); + registerAggregation(ExtendedStatsAggregationBuilder::new, InternalExtendedStats::new, new ExtendedStatsParser(), + ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(ValueCountAggregationBuilder::new, new ValueCountParser(), ValueCountAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(PercentilesAggregationBuilder::new, new PercentilesParser(), PercentilesAggregationBuilder.AGGREGATION_NAME_FIELD); @@ -522,6 +540,7 @@ public class SearchModule extends AbstractModule { ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse, ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD); + registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER, @@ -532,9 +551,11 @@ public class SearchModule extends AbstractModule { AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER, SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregationBuilder.PARSER, + registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new, + InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER, StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, new ExtendedStatsBucketParser(), + registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new, + InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(), ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER, PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); @@ -703,8 +724,6 @@ public class SearchModule extends AbstractModule { InternalSum.registerStreams(); InternalMin.registerStreams(); InternalMax.registerStreams(); - InternalStats.registerStreams(); - InternalExtendedStats.registerStreams(); InternalValueCount.registerStreams(); InternalTDigestPercentiles.registerStreams(); InternalTDigestPercentileRanks.registerStreams(); @@ -749,8 +768,6 @@ public class SearchModule extends AbstractModule { MinBucketPipelineAggregator.registerStreams(); AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); - StatsBucketPipelineAggregator.registerStreams(); - ExtendedStatsBucketPipelineAggregator.registerStreams(); PercentilesBucketPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index b75244ce88..93ecfa975b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -151,9 +151,13 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St } else { pipelineAggregators = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BytesReference type = in.readBytesReference(); - PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); - pipelineAggregators.add(pipelineAggregator); + if (in.readBoolean()) { + pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class)); + } else { + BytesReference type = in.readBytesReference(); + PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); + pipelineAggregators.add(pipelineAggregator); + } } } } @@ -174,9 +178,13 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St } else { pipelineAggregators = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BytesReference type = in.readBytesReference(); - PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); - pipelineAggregators.add(pipelineAggregator); + if (in.readBoolean()) { + pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class)); + } else { + BytesReference type = in.readBytesReference(); + PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); + pipelineAggregators.add(pipelineAggregator); + } } } doReadFrom(in); @@ -188,12 +196,20 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St @Override public final void writeTo(StreamOutput out) throws IOException { - out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable + out.writeString(name); // NORELEASE remote writing the name? it is automatically handled with writeNamedWriteable out.writeGenericValue(metaData); out.writeVInt(pipelineAggregators.size()); for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - out.writeBytesReference(pipelineAggregator.type().stream()); - pipelineAggregator.writeTo(out); + // NORELEASE temporary hack to support old style streams and new style NamedWriteable + try { + pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams. + out.writeBoolean(true); + out.writeNamedWriteable(pipelineAggregator); + } catch (UnsupportedOperationException e) { + out.writeBoolean(false); + out.writeBytesReference(pipelineAggregator.type().stream()); + pipelineAggregator.writeTo(out); + } } doWriteTo(out); } @@ -201,21 +217,22 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St protected abstract void doWriteTo(StreamOutput out) throws IOException; @Override - public String getName() { - return name; - } - - @Override public String getWriteableName() { // NORELEASE remove me when all InternalAggregations override it throw new UnsupportedOperationException("Override on every class"); } + @Override + public String getName() { + return name; + } + /** * @return The {@link Type} of this aggregation */ public Type type() { - throw new UnsupportedOperationException("Use getWriteableName instead"); // NORELEASE remove me + // NORELEASE remove this method + throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead"); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java index cbbb349310..e060826c24 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -31,26 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -/** -* -*/ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue implements Stats { - - public final static Type TYPE = new Type("stats"); - - public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalStats readResult(StreamInput in) throws IOException { - InternalStats result = new InternalStats(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - enum Metrics { count, sum, min, max, avg; @@ -60,12 +40,10 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue } } - protected long count; - protected double min; - protected double max; - protected double sum; - - protected InternalStats() {} // for serialization + protected final long count; + protected final double min; + protected final double max; + protected final double sum; public InternalStats(String name, long count, double sum, double min, double max, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, @@ -78,6 +56,36 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue this.format = formatter; } + /** + * Read from a stream. + */ + public InternalStats(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + count = in.readVLong(); + min = in.readDouble(); + max = in.readDouble(); + sum = in.readDouble(); + } + + @Override + protected final void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeVLong(count); + out.writeDouble(min); + out.writeDouble(max); + out.writeDouble(sum); + writeOtherStatsTo(out); + } + + protected void writeOtherStatsTo(StreamOutput out) throws IOException { + } + + @Override + public String getWriteableName() { + return StatsAggregationBuilder.NAME; + } + @Override public long getCount() { return count; @@ -129,11 +137,6 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue } @Override - public Type type() { - return TYPE; - } - - @Override public double value(String name) { Metrics metrics = Metrics.valueOf(name); switch (metrics) { @@ -163,32 +166,6 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue return new InternalStats(name, count, sum, min, max, format, pipelineAggregators(), getMetaData()); } - @Override - protected void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - count = in.readVLong(); - min = in.readDouble(); - max = in.readDouble(); - sum = in.readDouble(); - readOtherStatsFrom(in); - } - - public void readOtherStatsFrom(StreamInput in) throws IOException { - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - out.writeVLong(count); - out.writeDouble(min); - out.writeDouble(max); - out.writeDouble(sum); - writeOtherStatsTo(out); - } - - protected void writeOtherStatsTo(StreamOutput out) throws IOException { - } - static class Fields { public static final String COUNT = "count"; public static final String MIN = "min"; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java index 4fd80e0449..cac1d92101 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/StatsAggregationBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValueType; @@ -36,18 +37,19 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType; import java.io.IOException; public class StatsAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, StatsAggregationBuilder> { - public static final String NAME = InternalStats.TYPE.name(); + public static final String NAME = "stats"; + private final static Type TYPE = new Type(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public StatsAggregationBuilder(String name) { - super(name, InternalStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); + super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); } /** * Read from a stream. */ public StatsAggregationBuilder(StreamInput in) throws IOException { - super(in, InternalStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); + super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java index a2b961f1fc..a28508c217 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/ExtendedStatsAggregationBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValueType; @@ -38,20 +39,21 @@ import java.util.Objects; public class ExtendedStatsAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, ExtendedStatsAggregationBuilder> { - public static final String NAME = InternalExtendedStats.TYPE.name(); + public static final String NAME = "extended_stats"; + public final static Type TYPE = new Type(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private double sigma = 2.0; public ExtendedStatsAggregationBuilder(String name) { - super(name, InternalExtendedStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); + super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); } /** * Read from a stream. */ public ExtendedStatsAggregationBuilder(StreamInput in) throws IOException { - super(in, InternalExtendedStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); + super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); sigma = in.readDouble(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java index eff72797b5..d848001171 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java @@ -18,12 +18,10 @@ */ package org.elasticsearch.search.aggregations.metrics.stats.extended; -import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.stats.InternalStats; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -32,26 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -/** -* -*/ public class InternalExtendedStats extends InternalStats implements ExtendedStats { - - public final static Type TYPE = new Type("extended_stats", "estats"); - - public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalExtendedStats readResult(StreamInput in) throws IOException { - InternalExtendedStats result = new InternalExtendedStats(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - enum Metrics { count, sum, min, max, avg, sum_of_squares, variance, std_deviation, std_upper, std_lower; @@ -61,10 +40,8 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat } } - private double sumOfSqrs; - private double sigma; - - protected InternalExtendedStats() {} // for serialization + private final double sumOfSqrs; + private final double sigma; public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { @@ -73,9 +50,24 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat this.sigma = sigma; } + /** + * Read from a stream. + */ + public InternalExtendedStats(StreamInput in) throws IOException { + super(in); + sumOfSqrs = in.readDouble(); + sigma = in.readDouble(); + } + @Override - public Type type() { - return TYPE; + protected void writeOtherStatsTo(StreamOutput out) throws IOException { + out.writeDouble(sumOfSqrs); + out.writeDouble(sigma); + } + + @Override + public String getWriteableName() { + return ExtendedStatsAggregationBuilder.NAME; } @Override @@ -157,19 +149,6 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat format, pipelineAggregators(), getMetaData()); } - @Override - public void readOtherStatsFrom(StreamInput in) throws IOException { - sumOfSqrs = in.readDouble(); - sigma = in.readDouble(); - } - - @Override - protected void writeOtherStatsTo(StreamOutput out) throws IOException { - out.writeDouble(sumOfSqrs); - out.writeDouble(sigma); - } - - static class Fields { public static final String SUM_OF_SQRS = "sum_of_squares"; public static final String SUM_OF_SQRS_AS_STRING = "sum_of_squares_as_string"; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index 8f7ea955e9..8c43f7460d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -33,7 +34,8 @@ import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import java.io.IOException; import java.util.Map; -public abstract class PipelineAggregator implements Streamable { +public abstract class PipelineAggregator implements Streamable, NamedWriteable { + // NORELEASE remove Streamable /** * Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}. @@ -73,25 +75,36 @@ public abstract class PipelineAggregator implements Streamable { this.metaData = metaData; } - public String name() { - return name; + /** + * Read from a stream. + */ + protected PipelineAggregator(StreamInput in) throws IOException { + name = in.readString(); + bucketsPaths = in.readStringArray(); + metaData = in.readMap(); } - public String[] bucketsPaths() { - return bucketsPaths; + @Override + public final void readFrom(StreamInput in) throws IOException { + try { + getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams + assert false : "Used reading constructor instead"; + } catch (UnsupportedOperationException e) { + // OK + } + name = in.readString(); + bucketsPaths = in.readStringArray(); + metaData = in.readMap(); + doReadFrom(in); } - public Map<String, Object> metaData() { - return metaData; + protected void doReadFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable } - public abstract Type type(); - - public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); - @Override public final void writeTo(StreamOutput out) throws IOException { - out.writeString(name); + out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable out.writeStringArray(bucketsPaths); out.writeMap(metaData); doWriteTo(out); @@ -100,12 +113,27 @@ public abstract class PipelineAggregator implements Streamable { protected abstract void doWriteTo(StreamOutput out) throws IOException; @Override - public final void readFrom(StreamInput in) throws IOException { - name = in.readString(); - bucketsPaths = in.readStringArray(); - metaData = in.readMap(); - doReadFrom(in); + public String getWriteableName() { + // NORELEASE remove me when all InternalAggregations override it + throw new UnsupportedOperationException("Override on every class"); + } + + public String name() { + return name; + } + + public String[] bucketsPaths() { + return bucketsPaths; + } + + public Map<String, Object> metaData() { + return metaData; } - protected abstract void doReadFrom(StreamInput in) throws IOException; + public Type type() { + // NORELEASE remove this method + throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead"); + } + + public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index cc4681c903..f69e81fc8b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -27,6 +28,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -43,6 +45,13 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator { super(name, bucketsPaths, metaData); } + /** + * Read from a stream. + */ + protected SiblingPipelineAggregator(StreamInput in) throws IOException { + super(in); + } + @SuppressWarnings("unchecked") @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java index 06b093a473..c09d303f4e 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java @@ -59,6 +59,35 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg this.format = format; } + /** + * Read from a stream. + */ + protected BucketMetricsPipelineAggregator(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + } + + @Override + public final void doReadFrom(StreamInput in) throws IOException { + format = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + innerReadFrom(in); + } + + protected void innerReadFrom(StreamInput in) throws IOException { + } + + @Override + public final void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + gapPolicy.writeTo(out); + innerWriteTo(out); + } + + protected void innerWriteTo(StreamOutput out) throws IOException { + } + @Override public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) { preCollection(); @@ -109,25 +138,4 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg * for this bucket */ protected abstract void collectBucketValue(String bucketKey, Double bucketValue); - - @Override - public final void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - innerReadFrom(in); - } - - protected void innerReadFrom(StreamInput in) throws IOException { - } - - @Override - public final void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - gapPolicy.writeTo(out); - innerWriteTo(out); - } - - protected void innerWriteTo(StreamOutput out) throws IOException { - } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java index 84f8b26575..371e5bf5e8 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java @@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.stats.InternalStats; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -31,34 +30,21 @@ import java.util.List; import java.util.Map; public class InternalStatsBucket extends InternalStats implements StatsBucket { - - public final static Type TYPE = new Type("stats_bucket"); - - public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalStatsBucket readResult(StreamInput in) throws IOException { - InternalStatsBucket result = new InternalStatsBucket(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - public InternalStatsBucket(String name, long count, double sum, double min, double max, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, count, sum, min, max, formatter, pipelineAggregators, metaData); } - InternalStatsBucket() { - // For serialization + /** + * Read from a stream. + */ + public InternalStatsBucket(StreamInput in) throws IOException { + super(in); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return StatsBucketPipelineAggregationBuilder.NAME; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java index d3418e897e..9e0432b288 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java @@ -35,11 +35,11 @@ import java.util.List; import java.util.Map; public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<StatsBucketPipelineAggregationBuilder> { - public static final String NAME = StatsBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "stats_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public StatsBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, StatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** @@ -47,7 +47,7 @@ public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipeline */ public StatsBucketPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, StatsBucketPipelineAggregator.TYPE.name()); + super(in, NAME); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java index 8be8a8462b..4b1febf444 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java @@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -33,23 +31,6 @@ import java.util.List; import java.util.Map; public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public final static Type TYPE = new Type("stats_bucket"); - - public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public StatsBucketPipelineAggregator readResult(StreamInput in) throws IOException { - StatsBucketPipelineAggregator result = new StatsBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - InternalStatsBucket.registerStreams(); - } - private double sum = 0; private long count = 0; private double min = Double.POSITIVE_INFINITY; @@ -60,13 +41,13 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat super(name, bucketsPaths, gapPolicy, formatter, metaData); } - StatsBucketPipelineAggregator() { - // For serialization + public StatsBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return StatsBucketPipelineAggregationBuilder.NAME; } @Override @@ -89,5 +70,4 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) { return new InternalStatsBucket(name(), count, sum, min, max, format, pipelineAggregators, metadata); } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java index 792b9d1f7f..82920be6cf 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java @@ -36,20 +36,20 @@ import java.util.Objects; public class ExtendedStatsBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<ExtendedStatsBucketPipelineAggregationBuilder> { - public static final String NAME = ExtendedStatsBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "extended_stats_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private double sigma = 2.0; public ExtendedStatsBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, ExtendedStatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** * Read from a stream. */ public ExtendedStatsBucketPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, ExtendedStatsBucketPipelineAggregator.TYPE.name()); + super(in, NAME); sigma = in.readDouble(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java index d5e8628d6b..03ed20d494 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java @@ -23,10 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -34,29 +32,12 @@ import java.util.List; import java.util.Map; public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public final static Type TYPE = new Type("extended_stats_bucket"); - - public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public ExtendedStatsBucketPipelineAggregator readResult(StreamInput in) throws IOException { - ExtendedStatsBucketPipelineAggregator result = new ExtendedStatsBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - InternalExtendedStatsBucket.registerStreams(); - } - + private final double sigma; private double sum = 0; private long count = 0; private double min = Double.POSITIVE_INFINITY; private double max = Double.NEGATIVE_INFINITY; private double sumOfSqrs = 1; - private double sigma; protected ExtendedStatsBucketPipelineAggregator(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy, DocValueFormat formatter, Map<String, Object> metaData) { @@ -64,13 +45,22 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline this.sigma = sigma; } - ExtendedStatsBucketPipelineAggregator() { - // For Serialization + /** + * Read from a stream. + */ + public ExtendedStatsBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + sigma = in.readDouble(); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeDouble(sigma); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return ExtendedStatsBucketPipelineAggregationBuilder.NAME; } @Override @@ -95,14 +85,4 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) { return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, format, pipelineAggregators, metadata); } - - @Override - protected void innerReadFrom(StreamInput in) throws IOException { - sigma = in.readDouble(); - } - - @Override - protected void innerWriteTo(StreamOutput out) throws IOException { - out.writeDouble(sigma); - } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java index 548e644329..5589a9ebbc 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java @@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.exten import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -31,35 +30,22 @@ import java.util.List; import java.util.Map; public class InternalExtendedStatsBucket extends InternalExtendedStats implements ExtendedStatsBucket { - - public final static Type TYPE = new Type("extended_stats_bucket"); - - public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalExtendedStatsBucket readResult(StreamInput in) throws IOException { - InternalExtendedStatsBucket result = new InternalExtendedStatsBucket(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - InternalExtendedStatsBucket(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metaData); } - InternalExtendedStatsBucket() { - // for serialization + /** + * Read from a stream. + */ + public InternalExtendedStatsBucket(StreamInput in) throws IOException { + super(in); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return ExtendedStatsBucketPipelineAggregationBuilder.NAME; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index be8c895eec..4fc814a044 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -224,9 +224,14 @@ public class QuerySearchResult extends QuerySearchResultProvider { int size = in.readVInt(); List<SiblingPipelineAggregator> pipelineAggregators = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BytesReference type = in.readBytesReference(); - PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); - pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator); + // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time + if (in.readBoolean()) { + pipelineAggregators.add((SiblingPipelineAggregator) in.readNamedWriteable(PipelineAggregator.class)); + } else { + BytesReference type = in.readBytesReference(); + PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); + pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator); + } } this.pipelineAggregators = pipelineAggregators; } @@ -273,8 +278,16 @@ public class QuerySearchResult extends QuerySearchResultProvider { out.writeBoolean(true); out.writeVInt(pipelineAggregators.size()); for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - out.writeBytesReference(pipelineAggregator.type().stream()); - pipelineAggregator.writeTo(out); + // NORELEASE temporary hack to support old style streams and new style NamedWriteable + try { + pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams. + out.writeBoolean(true); + out.writeNamedWriteable(pipelineAggregator); + } catch (UnsupportedOperationException e) { + out.writeBoolean(false); + out.writeBytesReference(pipelineAggregator.type().stream()); + pipelineAggregator.writeTo(out); + } } } if (suggest == null) { diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/ExtendedStatsBucketTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/ExtendedStatsBucketTests.java index e1441b0dc5..390501d200 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/ExtendedStatsBucketTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/ExtendedStatsBucketTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryParseContext; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregationBuilder; import static org.hamcrest.Matchers.equalTo; @@ -51,7 +50,7 @@ public class ExtendedStatsBucketTests extends AbstractBucketMetricsTestCase<Exte parser.nextToken(); // skip object start ExtendedStatsBucketPipelineAggregationBuilder builder = (ExtendedStatsBucketPipelineAggregationBuilder) aggParsers - .pipelineParser(ExtendedStatsBucketPipelineAggregator.TYPE.name(), parseFieldMatcher) + .pipelineParser(ExtendedStatsBucketPipelineAggregationBuilder.NAME, parseFieldMatcher) .parse("test", parseContext); assertThat(builder.sigma(), equalTo(5.0)); |