diff options
author | Nik Everett <nik9000@gmail.com> | 2016-07-18 11:54:31 -0400 |
---|---|---|
committer | Nik Everett <nik9000@gmail.com> | 2016-07-19 14:43:29 -0400 |
commit | 9e2221cae5e949a7574ccad333921ad6c9455c39 (patch) | |
tree | 90b9317702240333744101f3a42ac39092e5c0b6 | |
parent | 11389638f9393e854bd99513964a8c01f4c8db07 (diff) |
Migrate remaining aggregations to NamedWriteable
After this we'll be able to remove AggregationStreams and
PipelineAggregatorStreams.
24 files changed, 256 insertions, 459 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 005d2ab664..e8ddc5298a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -211,6 +211,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.InternalPercentilesBucket; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.InternalStatsBucket; @@ -437,18 +438,16 @@ public class SearchModule extends AbstractModule { pipelineAggregationParserRegistry.register(spec.parser, spec.name); } namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader); - for (Map.Entry<String, Writeable.Reader<? extends PipelineAggregator>> resultReader : spec.resultReaders.entrySet()) { - namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue()); - } - for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders : spec.bucketReaders.entrySet()) { - namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue()); + namedWriteableRegistry.register(PipelineAggregator.class, spec.name.getPreferredName(), spec.aggregatorReader); + for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.resultReaders.entrySet()) { + namedWriteableRegistry.register(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()); } } public static class PipelineAggregationSpec { - private final Map<String, Writeable.Reader<? extends PipelineAggregator>> resultReaders = new TreeMap<>(); - private final Map<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders = new TreeMap<>(); + private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>(); private final Writeable.Reader<? extends PipelineAggregationBuilder> builderReader; + private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader; private final PipelineAggregator.Parser parser; private final ParseField name; @@ -456,13 +455,16 @@ public class SearchModule extends AbstractModule { * Register a pipeline aggregation. * * @param builderReader reads the {@link PipelineAggregationBuilder} from a stream + * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is * registered under. */ public PipelineAggregationSpec(Reader<? extends PipelineAggregationBuilder> builderReader, + Writeable.Reader<? extends PipelineAggregator> aggregatorReader, PipelineAggregator.Parser parser, ParseField name) { this.builderReader = builderReader; + this.aggregatorReader = aggregatorReader; this.parser = parser; this.name = name; } @@ -471,33 +473,17 @@ public class SearchModule extends AbstractModule { * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as * the {@link NamedWriteable#getWriteableName()}. */ - public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends PipelineAggregator> resultReader) { + public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) { return addResultReader(name.getPreferredName(), resultReader); } /** * Add a reader for the shard level results of the aggregation. */ - public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends PipelineAggregator> resultReader) { + public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) { resultReaders.put(writeableName, resultReader); return this; } - - /** - * Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()} - * as the {@link NamedWriteable#getWriteableName()}. - */ - public PipelineAggregationSpec addBucketReader(Writeable.Reader<? extends InternalAggregation> resultReader) { - return addBucketReader(name.getPreferredName(), resultReader); - } - - /** - * Add a reader for the shard level results of the aggregation. - */ - public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) { - bucketReaders.put(writeableName, resultReader); - return this; - } } public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader, @@ -606,49 +592,80 @@ public class SearchModule extends AbstractModule { registerPipelineAggregation(new PipelineAggregationSpec( DerivativePipelineAggregationBuilder::new, + DerivativePipelineAggregator::new, DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD) - .addResultReader(DerivativePipelineAggregator::new) - .addBucketReader(InternalDerivative::new)); - registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER, - MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER, - MinBucketPipelineAggregationBuilder.AGGREGATION_FIELD_NAME); - registerPipelineAggregation(AvgBucketPipelineAggregationBuilder::new, AvgBucketPipelineAggregationBuilder.PARSER, - AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER, - SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + .addResultReader(InternalDerivative::new)); + registerPipelineAggregation(new PipelineAggregationSpec( + MaxBucketPipelineAggregationBuilder::new, + MaxBucketPipelineAggregator::new, + MaxBucketPipelineAggregationBuilder.PARSER, + MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + // This bucket is used by many pipeline aggreations. + .addResultReader(InternalBucketMetricValue.NAME, InternalBucketMetricValue::new)); + registerPipelineAggregation(new PipelineAggregationSpec( + MinBucketPipelineAggregationBuilder::new, + MinBucketPipelineAggregator::new, + MinBucketPipelineAggregationBuilder.PARSER, + MinBucketPipelineAggregationBuilder.AGGREGATION_FIELD_NAME) + /* Uses InternalBucketMetricValue */); + registerPipelineAggregation(new PipelineAggregationSpec( + AvgBucketPipelineAggregationBuilder::new, + AvgBucketPipelineAggregator::new, + AvgBucketPipelineAggregationBuilder.PARSER, + AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + // This bucket is used by many pipeline aggreations. + .addResultReader(InternalSimpleValue.NAME, InternalSimpleValue::new)); + registerPipelineAggregation(new PipelineAggregationSpec( + SumBucketPipelineAggregationBuilder::new, + SumBucketPipelineAggregator::new, + SumBucketPipelineAggregationBuilder.PARSER, + SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + /* Uses InternalSimpleValue */); registerPipelineAggregation(new PipelineAggregationSpec( StatsBucketPipelineAggregationBuilder::new, + StatsBucketPipelineAggregator::new, StatsBucketPipelineAggregationBuilder.PARSER, StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) - .addResultReader(StatsBucketPipelineAggregator::new) - .addBucketReader(InternalStatsBucket::new)); + .addResultReader(InternalStatsBucket::new)); registerPipelineAggregation(new PipelineAggregationSpec( ExtendedStatsBucketPipelineAggregationBuilder::new, + ExtendedStatsBucketPipelineAggregator::new, new ExtendedStatsBucketParser(), ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) - .addResultReader(ExtendedStatsBucketPipelineAggregator::new) - .addBucketReader(InternalExtendedStatsBucket::new)); - registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER, - PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + .addResultReader(InternalExtendedStatsBucket::new)); + registerPipelineAggregation(new PipelineAggregationSpec( + PercentilesBucketPipelineAggregationBuilder::new, + PercentilesBucketPipelineAggregator::new, + PercentilesBucketPipelineAggregationBuilder.PARSER, + PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(InternalPercentilesBucket::new)); registerPipelineAggregation(new PipelineAggregationSpec( MovAvgPipelineAggregationBuilder::new, + MovAvgPipelineAggregator::new, (n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c), MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME) - .addResultReader(MovAvgPipelineAggregator::new) /* Uses InternalHistogram for buckets */); - registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse, - CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse, - BucketScriptPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(BucketSelectorPipelineAggregationBuilder::new, BucketSelectorPipelineAggregationBuilder::parse, - BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + registerPipelineAggregation(new PipelineAggregationSpec( + CumulativeSumPipelineAggregationBuilder::new, + CumulativeSumPipelineAggregator::new, + CumulativeSumPipelineAggregationBuilder::parse, + CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)); + registerPipelineAggregation(new PipelineAggregationSpec( + BucketScriptPipelineAggregationBuilder::new, + BucketScriptPipelineAggregator::new, + BucketScriptPipelineAggregationBuilder::parse, + BucketScriptPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)); + registerPipelineAggregation(new PipelineAggregationSpec( + BucketSelectorPipelineAggregationBuilder::new, + BucketSelectorPipelineAggregator::new, + BucketSelectorPipelineAggregationBuilder::parse, + BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)); registerPipelineAggregation(new PipelineAggregationSpec( SerialDiffPipelineAggregationBuilder::new, + SerialDiffPipelineAggregator::new, SerialDiffPipelineAggregationBuilder::parse, - SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) - .addResultReader(SerialDiffPipelineAggregator::new)); + SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)); } protected void configureSearch() { @@ -879,18 +896,4 @@ public class SearchModule extends AbstractModule { registerQuery(GeoShapeQueryBuilder::new, GeoShapeQueryBuilder::fromXContent, GeoShapeQueryBuilder.QUERY_NAME_FIELD); } } - - static { - // Pipeline Aggregations - InternalSimpleValue.registerStreams(); - InternalBucketMetricValue.registerStreams(); - MaxBucketPipelineAggregator.registerStreams(); - MinBucketPipelineAggregator.registerStreams(); - AvgBucketPipelineAggregator.registerStreams(); - SumBucketPipelineAggregator.registerStreams(); - PercentilesBucketPipelineAggregator.registerStreams(); - CumulativeSumPipelineAggregator.registerStreams(); - BucketScriptPipelineAggregator.registerStreams(); - BucketSelectorPipelineAggregator.registerStreams(); - } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java index 01ac5778f5..b9d79b8e32 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.max.InternalMax; @@ -33,26 +32,8 @@ import java.util.List; import java.util.Map; public class InternalSimpleValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue { - - public static final Type TYPE = new Type("simple_value"); - - public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalSimpleValue readResult(StreamInput in) throws IOException { - InternalSimpleValue result = new InternalSimpleValue(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - - private double value; - - protected InternalSimpleValue() { // NORELEASE remove and make value final if possible - } // for serialization + public static final String NAME = "simple_value"; + private final double value; public InternalSimpleValue(String name, double value, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { @@ -71,18 +52,17 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl } @Override - protected void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - value = in.readDouble(); - } - - @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeNamedWriteable(format); out.writeDouble(value); } @Override + public String getWriteableName() { + return NAME; + } + + @Override public double value() { return value; } @@ -92,11 +72,6 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl } @Override - public Type type() { - return TYPE; - } - - @Override public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index f69e81fc8b..79ec761b0e 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -37,7 +37,7 @@ import java.util.stream.StreamSupport; public abstract class SiblingPipelineAggregator extends PipelineAggregator { - protected SiblingPipelineAggregator() { // for Serialisation + protected SiblingPipelineAggregator() { // NOCOMMIT remove me super(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java index c09d303f4e..6bdbd5e6e4 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java @@ -45,12 +45,8 @@ import java.util.Map; */ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAggregator { - protected DocValueFormat format; - protected GapPolicy gapPolicy; - - public BucketMetricsPipelineAggregator() { - super(); - } + protected final DocValueFormat format; + protected final GapPolicy gapPolicy; protected BucketMetricsPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat format, Map<String, Object> metaData) { @@ -69,16 +65,6 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg } @Override - public final void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - innerReadFrom(in); - } - - protected void innerReadFrom(StreamInput in) throws IOException { - } - - @Override public final void doWriteTo(StreamOutput out) throws IOException { out.writeNamedWriteable(format); gapPolicy.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/InternalBucketMetricValue.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/InternalBucketMetricValue.java index 683620b672..6477728b32 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/InternalBucketMetricValue.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/InternalBucketMetricValue.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -33,30 +32,11 @@ import java.util.List; import java.util.Map; public class InternalBucketMetricValue extends InternalNumericMetricsAggregation.SingleValue { - - public static final Type TYPE = new Type("bucket_metric_value"); - - public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalBucketMetricValue readResult(StreamInput in) throws IOException { - InternalBucketMetricValue result = new InternalBucketMetricValue(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } + public static final String NAME = "bucket_metric_value"; private double value; - private String[] keys; - protected InternalBucketMetricValue() { - super(); - } - public InternalBucketMetricValue(String name, String[] keys, double value, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, pipelineAggregators, metaData); @@ -65,9 +45,26 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation this.format = formatter; } + /** + * Read from a stream. + */ + public InternalBucketMetricValue(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + value = in.readDouble(); + keys = in.readStringArray(); + } + @Override - public Type type() { - return TYPE; + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeDouble(value); + out.writeStringArray(keys); + } + + @Override + public String getWriteableName() { + return NAME; } @Override @@ -98,20 +95,6 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation } @Override - protected void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - value = in.readDouble(); - keys = in.readStringArray(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - out.writeDouble(value); - out.writeStringArray(keys); - } - - @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { boolean hasValue = !Double.isInfinite(value); builder.field(CommonFields.VALUE, hasValue ? value : null); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregationBuilder.java index d7d7a1e0c5..95a62d388a 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregationBuilder.java @@ -34,11 +34,11 @@ import java.util.List; import java.util.Map; public class AvgBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<AvgBucketPipelineAggregationBuilder> { - public static final String NAME = AvgBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "avg_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public AvgBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, AvgBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java index 56d4ed4170..776862a48d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java @@ -22,11 +22,9 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -34,36 +32,24 @@ import java.util.List; import java.util.Map; public class AvgBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public static final Type TYPE = new Type("avg_bucket"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public AvgBucketPipelineAggregator readResult(StreamInput in) throws IOException { - AvgBucketPipelineAggregator result = new AvgBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - private int count = 0; private double sum = 0; - private AvgBucketPipelineAggregator() { - } - protected AvgBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat format, Map<String, Object> metaData) { super(name, bucketsPaths, gapPolicy, format, metaData); } + /** + * Read from a stream. + */ + public AvgBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + } + @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return AvgBucketPipelineAggregationBuilder.NAME; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregationBuilder.java index 0c4fc632b9..7f517a6dc1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregationBuilder.java @@ -34,11 +34,11 @@ import java.util.List; import java.util.Map; public class MaxBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<MaxBucketPipelineAggregationBuilder> { - public static final String NAME = MaxBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "max_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public MaxBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, MaxBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregator.java index ac3cd06958..d17a592c34 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregator.java @@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue; @@ -36,36 +34,24 @@ import java.util.List; import java.util.Map; public class MaxBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public static final Type TYPE = new Type("max_bucket"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public MaxBucketPipelineAggregator readResult(StreamInput in) throws IOException { - MaxBucketPipelineAggregator result = new MaxBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - private List<String> maxBucketKeys; private double maxValue; - private MaxBucketPipelineAggregator() { - } - protected MaxBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter, Map<String, Object> metaData) { super(name, bucketsPaths, gapPolicy, formatter, metaData); } + /** + * Read from a stream. + */ + public MaxBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + } + @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return MaxBucketPipelineAggregationBuilder.NAME; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregationBuilder.java index d20dbe9f7f..5c45c73707 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregationBuilder.java @@ -34,11 +34,11 @@ import java.util.List; import java.util.Map; public class MinBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<MinBucketPipelineAggregationBuilder> { - public static final String NAME = MinBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "min_bucket"; public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME); public MinBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, MinBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregator.java index e4b86ee9f8..88595d7826 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregator.java @@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue; @@ -36,36 +34,24 @@ import java.util.List; import java.util.Map; public class MinBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public static final Type TYPE = new Type("min_bucket"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public MinBucketPipelineAggregator readResult(StreamInput in) throws IOException { - MinBucketPipelineAggregator result = new MinBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - private List<String> minBucketKeys; private double minValue; - private MinBucketPipelineAggregator() { - } - protected MinBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter, Map<String, Object> metaData) { super(name, bucketsPaths, gapPolicy, formatter, metaData); } + /** + * Read from a stream. + */ + public MinBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + } + @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return MinBucketPipelineAggregationBuilder.NAME; } @Override @@ -86,8 +72,7 @@ public class MinBucketPipelineAggregator extends BucketMetricsPipelineAggregator } @Override - protected InternalAggregation buildAggregation(java.util.List<PipelineAggregator> pipelineAggregators, - java.util.Map<String, Object> metadata) { + protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) { String[] keys = minBucketKeys.toArray(new String[minBucketKeys.size()]); return new InternalBucketMetricValue(name(), keys, minValue, format, Collections.emptyList(), metaData()); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java index badbb9411a..059d72a8e1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.max.InternalMax; @@ -38,28 +37,9 @@ import java.util.List; import java.util.Map; public class InternalPercentilesBucket extends InternalNumericMetricsAggregation.MultiValue implements PercentilesBucket { - - public static final Type TYPE = new Type("percentiles_bucket"); - - public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalPercentilesBucket readResult(StreamInput in) throws IOException { - InternalPercentilesBucket result = new InternalPercentilesBucket(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - private double[] percentiles; private double[] percents; - protected InternalPercentilesBucket() { - } // for serialization - public InternalPercentilesBucket(String name, double[] percents, double[] percentiles, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { @@ -69,6 +49,28 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation this.percents = percents; } + /** + * Read from a stream. + */ + public InternalPercentilesBucket(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + percentiles = in.readDoubleArray(); + percents = in.readDoubleArray(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeDoubleArray(percentiles); + out.writeDoubleArray(percents); + } + + @Override + public String getWriteableName() { + return PercentilesBucketPipelineAggregationBuilder.NAME; + } + @Override public double percentile(double percent) throws IllegalArgumentException { int index = Arrays.binarySearch(percents, percent); @@ -95,30 +97,11 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation } @Override - public Type type() { - return TYPE; - } - - @Override public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } @Override - protected void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - percentiles = in.readDoubleArray(); - percents = in.readDoubleArray(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - out.writeDoubleArray(percentiles); - out.writeDoubleArray(percents); - } - - @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startObject("values"); for (double percent : percents) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregationBuilder.java index 06a1b06dff..d0f0e29bdb 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregationBuilder.java @@ -40,7 +40,7 @@ import java.util.Objects; public class PercentilesBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<PercentilesBucketPipelineAggregationBuilder> { - public static final String NAME = PercentilesBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "percentiles_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private static final ParseField PERCENTS_FIELD = new ParseField("percents"); @@ -48,7 +48,7 @@ public class PercentilesBucketPipelineAggregationBuilder private double[] percents = new double[] { 1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0 }; public PercentilesBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, PercentilesBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java index 12769680e9..2818d0f3f5 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java @@ -24,10 +24,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -37,39 +35,33 @@ import java.util.List; import java.util.Map; public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public static final Type TYPE = new Type("percentiles_bucket"); public final ParseField PERCENTS_FIELD = new ParseField("percents"); - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public PercentilesBucketPipelineAggregator readResult(StreamInput in) throws IOException { - PercentilesBucketPipelineAggregator result = new PercentilesBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - InternalPercentilesBucket.registerStreams(); - } - - private double[] percents; + private final double[] percents; private List<Double> data; - private PercentilesBucketPipelineAggregator() { - } - protected PercentilesBucketPipelineAggregator(String name, double[] percents, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter, Map<String, Object> metaData) { super(name, bucketsPaths, gapPolicy, formatter, metaData); this.percents = percents; } + /** + * Read from a stream. + */ + public PercentilesBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + percents = in.readDoubleArray(); + } + @Override - public Type type() { - return TYPE; + public void innerWriteTo(StreamOutput out) throws IOException { + out.writeDoubleArray(percents); + } + + @Override + public String getWriteableName() { + return PercentilesBucketPipelineAggregationBuilder.NAME; } @Override @@ -105,15 +97,4 @@ public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAg return new InternalPercentilesBucket(name(), percents, percentiles, format, pipelineAggregators, metadata); } - - @Override - public void innerReadFrom(StreamInput in) throws IOException { - percents = in.readDoubleArray(); - } - - @Override - public void innerWriteTo(StreamOutput out) throws IOException { - out.writeDoubleArray(percents); - } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregationBuilder.java index cf20eaf6e7..669700cbf7 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregationBuilder.java @@ -34,11 +34,11 @@ import java.util.List; import java.util.Map; public class SumBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<SumBucketPipelineAggregationBuilder> { - public static final String NAME = SumBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "sum_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public SumBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, SumBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregator.java index 89d0ca9ccf..7efbf401d0 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregator.java @@ -22,11 +22,9 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -34,35 +32,23 @@ import java.util.List; import java.util.Map; public class SumBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public static final Type TYPE = new Type("sum_bucket"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public SumBucketPipelineAggregator readResult(StreamInput in) throws IOException { - SumBucketPipelineAggregator result = new SumBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - private double sum = 0; - private SumBucketPipelineAggregator() { - } - protected SumBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter, Map<String, Object> metaData) { super(name, bucketsPaths, gapPolicy, formatter, metaData); } + /** + * Read from a stream. + */ + public SumBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + } + @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return SumBucketPipelineAggregationBuilder.NAME; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregationBuilder.java index 1e285c24a1..4925de9c77 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregationBuilder.java @@ -47,7 +47,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator. import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY; public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<BucketScriptPipelineAggregationBuilder> { - public static final String NAME = BucketScriptPipelineAggregator.TYPE.name(); + public static final String NAME = "bucket_script"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private final Script script; @@ -56,8 +56,7 @@ public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggr private GapPolicy gapPolicy = GapPolicy.SKIP; public BucketScriptPipelineAggregationBuilder(String name, Map<String, String> bucketsPathsMap, Script script) { - super(name, BucketScriptPipelineAggregator.TYPE.name(), new TreeMap<>(bucketsPathsMap).values() - .toArray(new String[bucketsPathsMap.size()])); + super(name, NAME, new TreeMap<>(bucketsPathsMap).values().toArray(new String[bucketsPathsMap.size()])); this.bucketsPathsMap = bucketsPathsMap; this.script = script; } @@ -70,7 +69,7 @@ public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggr * Read from a stream. */ public BucketScriptPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, BucketScriptPipelineAggregator.TYPE.name()); + super(in, NAME); int mapSize = in.readVInt(); bucketsPathsMap = new HashMap<String, String>(mapSize); for (int i = 0; i < mapSize; i++) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregator.java index 35893c7d09..008b0217c6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregator.java @@ -29,14 +29,12 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import java.io.IOException; import java.util.ArrayList; @@ -50,28 +48,10 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; public class BucketScriptPipelineAggregator extends PipelineAggregator { - - public static final Type TYPE = new Type("bucket_script"); - - public static final PipelineAggregatorStreams.Stream STREAM = in -> { - BucketScriptPipelineAggregator result = new BucketScriptPipelineAggregator(); - result.readFrom(in); - return result; - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - - private DocValueFormat formatter; - private GapPolicy gapPolicy; - - private Script script; - - private Map<String, String> bucketsPathsMap; - - public BucketScriptPipelineAggregator() { - } + private final DocValueFormat formatter; + private final GapPolicy gapPolicy; + private final Script script; + private final Map<String, String> bucketsPathsMap; public BucketScriptPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, DocValueFormat formatter, GapPolicy gapPolicy, Map<String, Object> metadata) { @@ -82,9 +62,29 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } + /** + * Read from a stream. + */ + @SuppressWarnings("unchecked") + public BucketScriptPipelineAggregator(StreamInput in) throws IOException { + super(in); + script = new Script(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + bucketsPathsMap = (Map<String, String>) in.readGenericValue(); + } + @Override - public Type type() { - return TYPE; + protected void doWriteTo(StreamOutput out) throws IOException { + script.writeTo(out); + out.writeNamedWriteable(formatter); + gapPolicy.writeTo(out); + out.writeGenericValue(bucketsPathsMap); + } + + @Override + public String getWriteableName() { + return BucketScriptPipelineAggregationBuilder.NAME; } @Override @@ -136,22 +136,4 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator { } return originalAgg.create(newBuckets); } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeGenericValue(bucketsPathsMap); - } - - @SuppressWarnings("unchecked") - @Override - protected void doReadFrom(StreamInput in) throws IOException { - script = new Script(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - bucketsPathsMap = (Map<String, String>) in.readGenericValue(); - } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregationBuilder.java index 9054812ed3..c9b423eedc 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregationBuilder.java @@ -45,7 +45,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator. import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY; public class BucketSelectorPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<BucketSelectorPipelineAggregationBuilder> { - public static final String NAME = BucketSelectorPipelineAggregator.TYPE.name(); + public static final String NAME = "bucket_selector"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private final Map<String, String> bucketsPathsMap; @@ -53,8 +53,7 @@ public class BucketSelectorPipelineAggregationBuilder extends AbstractPipelineAg private GapPolicy gapPolicy = GapPolicy.SKIP; public BucketSelectorPipelineAggregationBuilder(String name, Map<String, String> bucketsPathsMap, Script script) { - super(name, BucketSelectorPipelineAggregator.TYPE.name(), new TreeMap<>(bucketsPathsMap).values() - .toArray(new String[bucketsPathsMap.size()])); + super(name, NAME, new TreeMap<>(bucketsPathsMap).values().toArray(new String[bucketsPathsMap.size()])); this.bucketsPathsMap = bucketsPathsMap; this.script = script; } @@ -67,7 +66,7 @@ public class BucketSelectorPipelineAggregationBuilder extends AbstractPipelineAg * Read from a stream. */ public BucketSelectorPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, BucketSelectorPipelineAggregator.TYPE.name()); + super(in, NAME); int mapSize = in.readVInt(); bucketsPathsMap = new HashMap<String, String>(mapSize); for (int i = 0; i < mapSize; i++) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregator.java index 7fff7f04db..eabbad7213 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregator.java @@ -28,12 +28,10 @@ import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import java.io.IOException; import java.util.ArrayList; @@ -45,31 +43,12 @@ import java.util.Map; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; public class BucketSelectorPipelineAggregator extends PipelineAggregator { - - public static final Type TYPE = new Type("bucket_selector"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public BucketSelectorPipelineAggregator readResult(StreamInput in) throws IOException { - BucketSelectorPipelineAggregator result = new BucketSelectorPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - private GapPolicy gapPolicy; private Script script; private Map<String, String> bucketsPathsMap; - public BucketSelectorPipelineAggregator() { - } - public BucketSelectorPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, GapPolicy gapPolicy, Map<String, Object> metadata) { super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata); @@ -78,9 +57,27 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } + /** + * Read from a stream. + */ + @SuppressWarnings("unchecked") + public BucketSelectorPipelineAggregator(StreamInput in) throws IOException { + super(in); + script = new Script(in); + gapPolicy = GapPolicy.readFrom(in); + bucketsPathsMap = (Map<String, String>) in.readGenericValue(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + script.writeTo(out); + gapPolicy.writeTo(out); + out.writeGenericValue(bucketsPathsMap); + } + @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return BucketSelectorPipelineAggregationBuilder.NAME; } @Override @@ -119,20 +116,4 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator { } return originalAgg.create(newBuckets); } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - gapPolicy.writeTo(out); - out.writeGenericValue(bucketsPathsMap); - } - - @SuppressWarnings("unchecked") - @Override - protected void doReadFrom(StreamInput in) throws IOException { - script = new Script(in); - gapPolicy = GapPolicy.readFrom(in); - bucketsPathsMap = (Map<String, String>) in.readGenericValue(); - } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregationBuilder.java index 1927317fc2..898c5711c8 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregationBuilder.java @@ -44,20 +44,20 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator. import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; public class CumulativeSumPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<CumulativeSumPipelineAggregationBuilder> { - public static final String NAME = CumulativeSumPipelineAggregator.TYPE.name(); + public static final String NAME = "cumulative_sum"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private String format; public CumulativeSumPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, CumulativeSumPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** * Read from a stream. */ public CumulativeSumPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, CumulativeSumPipelineAggregator.TYPE.name()); + super(in, NAME); format = in.readOptionalString(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java index 9ba381f2a5..b9a0033272 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java @@ -24,13 +24,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import java.io.IOException; import java.util.ArrayList; @@ -42,23 +40,7 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; public class CumulativeSumPipelineAggregator extends PipelineAggregator { - - public static final Type TYPE = new Type("cumulative_sum"); - - public static final PipelineAggregatorStreams.Stream STREAM = in -> { - CumulativeSumPipelineAggregator result = new CumulativeSumPipelineAggregator(); - result.readFrom(in); - return result; - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - - private DocValueFormat formatter; - - public CumulativeSumPipelineAggregator() { - } + private final DocValueFormat formatter; public CumulativeSumPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, Map<String, Object> metadata) { @@ -66,9 +48,22 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator { this.formatter = formatter; } + /** + * Read from a stream. + */ + public CumulativeSumPipelineAggregator(StreamInput in) throws IOException { + super(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(formatter); + } + @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return CumulativeSumPipelineAggregationBuilder.NAME; } @Override @@ -92,14 +87,4 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator { } return factory.create(newBuckets, histo); } - - @Override - public void doReadFrom(StreamInput in) throws IOException { - formatter = in.readNamedWriteable(DocValueFormat.class); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java index 64dd0f87ca..b27d4a5a4c 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java @@ -1,5 +1,3 @@ -package org.elasticsearch.search.aggregations.pipeline; - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -19,7 +17,8 @@ package org.elasticsearch.search.aggregations.pipeline; * under the License. */ -import org.elasticsearch.ElasticsearchException; +package org.elasticsearch.search.aggregations.pipeline; + import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -45,7 +44,6 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.percentilesBucket; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.core.IsNull.notNullValue; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/PercentilesBucketTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/PercentilesBucketTests.java index e7c478b23f..71bf937fbb 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/PercentilesBucketTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/PercentilesBucketTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryParseContext; -import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder; import static org.hamcrest.Matchers.equalTo; @@ -56,7 +55,7 @@ public class PercentilesBucketTests extends AbstractBucketMetricsTestCase<Percen parser.nextToken(); // skip object start PercentilesBucketPipelineAggregationBuilder builder = (PercentilesBucketPipelineAggregationBuilder) aggParsers - .pipelineParser(PercentilesBucketPipelineAggregator.TYPE.name(), parseFieldMatcher) + .pipelineParser(PercentilesBucketPipelineAggregationBuilder.NAME, parseFieldMatcher) .parse("test", parseContext); assertThat(builder.percents(), equalTo(new double[]{0.0, 20.0, 50.0, 75.99})); |