From 91b66e3cf488cc2961bab106de68eae6b2b3cf5c Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 30 Jun 2016 15:19:58 -0400 Subject: Migration stats and extended stats to NamedWriteable Migrates the `stats` and `extended_stats` aggregations and pipeline aggregations from the special purpose aggregations streams to `NamedWriteable`. These are the first pipeline aggregations so this adds the infrastructure to support both streams and `NamedWriteable`s for pipeline aggregations. --- .../aggregations/pipeline/PipelineAggregator.java | 64 ++++++++++++++++------ .../pipeline/SiblingPipelineAggregator.java | 9 +++ .../BucketMetricsPipelineAggregator.java | 50 ++++++++++------- .../bucketmetrics/stats/InternalStatsBucket.java | 28 +++------- .../StatsBucketPipelineAggregationBuilder.java | 6 +- .../stats/StatsBucketPipelineAggregator.java | 28 ++-------- ...endedStatsBucketPipelineAggregationBuilder.java | 6 +- .../ExtendedStatsBucketPipelineAggregator.java | 48 +++++----------- .../extended/InternalExtendedStatsBucket.java | 28 +++------- 9 files changed, 122 insertions(+), 145 deletions(-) (limited to 'core/src/main/java/org/elasticsearch/search/aggregations/pipeline') diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index 8f7ea955e9..8c43f7460d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -33,7 +34,8 @@ import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import java.io.IOException; import java.util.Map; -public abstract class PipelineAggregator implements Streamable { +public abstract class PipelineAggregator implements Streamable, NamedWriteable { + // NORELEASE remove Streamable /** * Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}. @@ -73,25 +75,36 @@ public abstract class PipelineAggregator implements Streamable { this.metaData = metaData; } - public String name() { - return name; + /** + * Read from a stream. + */ + protected PipelineAggregator(StreamInput in) throws IOException { + name = in.readString(); + bucketsPaths = in.readStringArray(); + metaData = in.readMap(); } - public String[] bucketsPaths() { - return bucketsPaths; + @Override + public final void readFrom(StreamInput in) throws IOException { + try { + getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams + assert false : "Used reading constructor instead"; + } catch (UnsupportedOperationException e) { + // OK + } + name = in.readString(); + bucketsPaths = in.readStringArray(); + metaData = in.readMap(); + doReadFrom(in); } - public Map metaData() { - return metaData; + protected void doReadFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable } - public abstract Type type(); - - public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); - @Override public final void writeTo(StreamOutput out) throws IOException { - out.writeString(name); + out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable out.writeStringArray(bucketsPaths); out.writeMap(metaData); doWriteTo(out); @@ -100,12 +113,27 @@ public abstract class PipelineAggregator implements Streamable { protected abstract void doWriteTo(StreamOutput out) throws IOException; @Override - public final void readFrom(StreamInput in) throws IOException { - name = in.readString(); - bucketsPaths = in.readStringArray(); - metaData = in.readMap(); - doReadFrom(in); + public String getWriteableName() { + // NORELEASE remove me when all InternalAggregations override it + throw new UnsupportedOperationException("Override on every class"); + } + + public String name() { + return name; + } + + public String[] bucketsPaths() { + return bucketsPaths; + } + + public Map metaData() { + return metaData; } - protected abstract void doReadFrom(StreamInput in) throws IOException; + public Type type() { + // NORELEASE remove this method + throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead"); + } + + public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index cc4681c903..f69e81fc8b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -27,6 +28,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -43,6 +45,13 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator { super(name, bucketsPaths, metaData); } + /** + * Read from a stream. + */ + protected SiblingPipelineAggregator(StreamInput in) throws IOException { + super(in); + } + @SuppressWarnings("unchecked") @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java index 06b093a473..c09d303f4e 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java @@ -59,6 +59,35 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg this.format = format; } + /** + * Read from a stream. + */ + protected BucketMetricsPipelineAggregator(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + } + + @Override + public final void doReadFrom(StreamInput in) throws IOException { + format = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + innerReadFrom(in); + } + + protected void innerReadFrom(StreamInput in) throws IOException { + } + + @Override + public final void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + gapPolicy.writeTo(out); + innerWriteTo(out); + } + + protected void innerWriteTo(StreamOutput out) throws IOException { + } + @Override public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) { preCollection(); @@ -109,25 +138,4 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg * for this bucket */ protected abstract void collectBucketValue(String bucketKey, Double bucketValue); - - @Override - public final void doReadFrom(StreamInput in) throws IOException { - format = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - innerReadFrom(in); - } - - protected void innerReadFrom(StreamInput in) throws IOException { - } - - @Override - public final void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - gapPolicy.writeTo(out); - innerWriteTo(out); - } - - protected void innerWriteTo(StreamOutput out) throws IOException { - } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java index 84f8b26575..371e5bf5e8 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/InternalStatsBucket.java @@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.stats.InternalStats; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -31,34 +30,21 @@ import java.util.List; import java.util.Map; public class InternalStatsBucket extends InternalStats implements StatsBucket { - - public final static Type TYPE = new Type("stats_bucket"); - - public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalStatsBucket readResult(StreamInput in) throws IOException { - InternalStatsBucket result = new InternalStatsBucket(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - public InternalStatsBucket(String name, long count, double sum, double min, double max, DocValueFormat formatter, List pipelineAggregators, Map metaData) { super(name, count, sum, min, max, formatter, pipelineAggregators, metaData); } - InternalStatsBucket() { - // For serialization + /** + * Read from a stream. + */ + public InternalStatsBucket(StreamInput in) throws IOException { + super(in); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return StatsBucketPipelineAggregationBuilder.NAME; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java index d3418e897e..9e0432b288 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregationBuilder.java @@ -35,11 +35,11 @@ import java.util.List; import java.util.Map; public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder { - public static final String NAME = StatsBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "stats_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public StatsBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, StatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** @@ -47,7 +47,7 @@ public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipeline */ public StatsBucketPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, StatsBucketPipelineAggregator.TYPE.name()); + super(in, NAME); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java index 8be8a8462b..4b1febf444 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/StatsBucketPipelineAggregator.java @@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -33,23 +31,6 @@ import java.util.List; import java.util.Map; public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public final static Type TYPE = new Type("stats_bucket"); - - public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public StatsBucketPipelineAggregator readResult(StreamInput in) throws IOException { - StatsBucketPipelineAggregator result = new StatsBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - InternalStatsBucket.registerStreams(); - } - private double sum = 0; private long count = 0; private double min = Double.POSITIVE_INFINITY; @@ -60,13 +41,13 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat super(name, bucketsPaths, gapPolicy, formatter, metaData); } - StatsBucketPipelineAggregator() { - // For serialization + public StatsBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return StatsBucketPipelineAggregationBuilder.NAME; } @Override @@ -89,5 +70,4 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat protected InternalAggregation buildAggregation(List pipelineAggregators, Map metadata) { return new InternalStatsBucket(name(), count, sum, min, max, format, pipelineAggregators, metadata); } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java index 792b9d1f7f..82920be6cf 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregationBuilder.java @@ -36,20 +36,20 @@ import java.util.Objects; public class ExtendedStatsBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder { - public static final String NAME = ExtendedStatsBucketPipelineAggregator.TYPE.name(); + public static final String NAME = "extended_stats_bucket"; public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private double sigma = 2.0; public ExtendedStatsBucketPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, ExtendedStatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** * Read from a stream. */ public ExtendedStatsBucketPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, ExtendedStatsBucketPipelineAggregator.TYPE.name()); + super(in, NAME); sigma = in.readDouble(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java index d5e8628d6b..03ed20d494 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/ExtendedStatsBucketPipelineAggregator.java @@ -23,10 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; import java.io.IOException; @@ -34,29 +32,12 @@ import java.util.List; import java.util.Map; public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { - - public final static Type TYPE = new Type("extended_stats_bucket"); - - public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public ExtendedStatsBucketPipelineAggregator readResult(StreamInput in) throws IOException { - ExtendedStatsBucketPipelineAggregator result = new ExtendedStatsBucketPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - InternalExtendedStatsBucket.registerStreams(); - } - + private final double sigma; private double sum = 0; private long count = 0; private double min = Double.POSITIVE_INFINITY; private double max = Double.NEGATIVE_INFINITY; private double sumOfSqrs = 1; - private double sigma; protected ExtendedStatsBucketPipelineAggregator(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy, DocValueFormat formatter, Map metaData) { @@ -64,13 +45,22 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline this.sigma = sigma; } - ExtendedStatsBucketPipelineAggregator() { - // For Serialization + /** + * Read from a stream. + */ + public ExtendedStatsBucketPipelineAggregator(StreamInput in) throws IOException { + super(in); + sigma = in.readDouble(); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeDouble(sigma); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return ExtendedStatsBucketPipelineAggregationBuilder.NAME; } @Override @@ -95,14 +85,4 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline protected InternalAggregation buildAggregation(List pipelineAggregators, Map metadata) { return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, format, pipelineAggregators, metadata); } - - @Override - protected void innerReadFrom(StreamInput in) throws IOException { - sigma = in.readDouble(); - } - - @Override - protected void innerWriteTo(StreamOutput out) throws IOException { - out.writeDouble(sigma); - } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java index 548e644329..5589a9ebbc 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/stats/extended/InternalExtendedStatsBucket.java @@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.exten import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -31,35 +30,22 @@ import java.util.List; import java.util.Map; public class InternalExtendedStatsBucket extends InternalExtendedStats implements ExtendedStatsBucket { - - public final static Type TYPE = new Type("extended_stats_bucket"); - - public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalExtendedStatsBucket readResult(StreamInput in) throws IOException { - InternalExtendedStatsBucket result = new InternalExtendedStatsBucket(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - InternalExtendedStatsBucket(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma, DocValueFormat formatter, List pipelineAggregators, Map metaData) { super(name, count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metaData); } - InternalExtendedStatsBucket() { - // for serialization + /** + * Read from a stream. + */ + public InternalExtendedStatsBucket(StreamInput in) throws IOException { + super(in); } @Override - public Type type() { - return TYPE; + public String getWriteableName() { + return ExtendedStatsBucketPipelineAggregationBuilder.NAME; } @Override -- cgit v1.2.3