diff options
3 files changed, 136 insertions, 92 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index baf7276313..719b1ab110 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -98,7 +98,6 @@ import org.elasticsearch.plugins.SearchPlugin.SearchPluginSpec; import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.Aggregator.Parser; import org.elasticsearch.search.aggregations.AggregatorParsers; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; @@ -381,8 +380,10 @@ public class SearchModule extends AbstractModule { * Register an aggregation. */ public void registerAggregation(AggregationSpec spec) { - namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader); - aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName); + if (false == transportClient) { + aggregationParserRegistry.register(spec.parser, spec.name); + } + namedWriteableRegistry.register(AggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader); for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.resultReaders.entrySet()) { String writeableName = t.getKey(); Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue(); @@ -393,29 +394,30 @@ public class SearchModule extends AbstractModule { public static class AggregationSpec { private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>(); private final Writeable.Reader<? extends AggregationBuilder> builderReader; - private final Aggregator.Parser aggregationParser; - private final ParseField aggregationName; + private final Aggregator.Parser parser; + private final ParseField name; /** * Register an aggregation. * * @param builderReader reads the {@link AggregationBuilder} from a stream - * @param aggregationParser reads the aggregation builder from XContent - * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the - * reader is registered under. + * @param parser reads the aggregation builder from XContent + * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is + * registered under. */ - public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Parser aggregationParser, ParseField aggregationName) { + public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Aggregator.Parser parser, + ParseField name) { this.builderReader = builderReader; - this.aggregationParser = aggregationParser; - this.aggregationName = aggregationName; + this.parser = parser; + this.name = name; } /** - * Add a reader for the shard level results of the aggregation with {@linkplain #aggregationName}'s - * {@link ParseField#getPreferredName()} as the {@link NamedWriteable#getWriteableName()}. + * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as + * the {@link NamedWriteable#getWriteableName()}. */ public AggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) { - return addResultReader(aggregationName.getPreferredName(), resultReader); + return addResultReader(name.getPreferredName(), resultReader); } /** @@ -429,23 +431,73 @@ public class SearchModule extends AbstractModule { /** * Register a pipeline aggregation. - * - * @param reader reads the aggregation builder from a stream - * @param internalReader reads the {@link PipelineAggregator} from a stream - * @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream - * @param aggregationParser reads the aggregation builder from XContent - * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader - * is registered under. */ - public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader, - Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader, - PipelineAggregator.Parser aggregationParser, ParseField aggregationName) { + public void registerPipelineAggregation(PipelineAggregationSpec spec) { if (false == transportClient) { - pipelineAggregationParserRegistry.register(aggregationParser, aggregationName); + pipelineAggregationParserRegistry.register(spec.parser, spec.name); + } + namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader); + for (Map.Entry<String, Writeable.Reader<? extends PipelineAggregator>> resultReader : spec.resultReaders.entrySet()) { + namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue()); + } + for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders : spec.bucketReaders.entrySet()) { + namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue()); + } + } + + public static class PipelineAggregationSpec { + private final Map<String, Writeable.Reader<? extends PipelineAggregator>> resultReaders = new TreeMap<>(); + private final Map<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders = new TreeMap<>(); + private final Writeable.Reader<? extends PipelineAggregationBuilder> builderReader; + private final PipelineAggregator.Parser parser; + private final ParseField name; + + /** + * Register a pipeline aggregation. + * + * @param builderReader reads the {@link PipelineAggregationBuilder} from a stream + * @param parser reads the aggregation builder from XContent + * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is + * registered under. + */ + public PipelineAggregationSpec(Reader<? extends PipelineAggregationBuilder> builderReader, + PipelineAggregator.Parser parser, ParseField name) { + this.builderReader = builderReader; + this.parser = parser; + this.name = name; + } + + /** + * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as + * the {@link NamedWriteable#getWriteableName()}. + */ + public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends PipelineAggregator> resultReader) { + return addResultReader(name.getPreferredName(), resultReader); + } + + /** + * Add a reader for the shard level results of the aggregation. + */ + public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends PipelineAggregator> resultReader) { + resultReaders.put(writeableName, resultReader); + return this; + } + + /** + * Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()} + * as the {@link NamedWriteable#getWriteableName()}. + */ + public PipelineAggregationSpec addBucketReader(Writeable.Reader<? extends InternalAggregation> resultReader) { + return addBucketReader(name.getPreferredName(), resultReader); + } + + /** + * Add a reader for the shard level results of the aggregation. + */ + public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) { + bucketReaders.put(writeableName, resultReader); + return this; } - namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader); - namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader); - namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader); } public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader, @@ -552,8 +604,12 @@ public class SearchModule extends AbstractModule { registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse, ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new)); - registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new, - DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + registerPipelineAggregation(new PipelineAggregationSpec( + DerivativePipelineAggregationBuilder::new, + DerivativePipelineAggregationBuilder::parse, + DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(DerivativePipelineAggregator::new) + .addBucketReader(InternalDerivative::new)); registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER, MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER, @@ -562,17 +618,26 @@ public class SearchModule extends AbstractModule { AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER, SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new, - InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER, - StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new, - InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(), - ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + registerPipelineAggregation(new PipelineAggregationSpec( + StatsBucketPipelineAggregationBuilder::new, + StatsBucketPipelineAggregationBuilder.PARSER, + StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(StatsBucketPipelineAggregator::new) + .addBucketReader(InternalStatsBucket::new)); + registerPipelineAggregation(new PipelineAggregationSpec( + ExtendedStatsBucketPipelineAggregationBuilder::new, + new ExtendedStatsBucketParser(), + ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(ExtendedStatsBucketPipelineAggregator::new) + .addBucketReader(InternalExtendedStatsBucket::new)); registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER, PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(MovAvgPipelineAggregationBuilder::new, + registerPipelineAggregation(new PipelineAggregationSpec( + MovAvgPipelineAggregationBuilder::new, (n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c), - MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME); + MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME) + .addResultReader(MovAvgPipelineAggregator::new) + /* Uses InternalHistogram for buckets */); registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse, CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse, @@ -821,7 +886,6 @@ public class SearchModule extends AbstractModule { AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); PercentilesBucketPipelineAggregator.registerStreams(); - MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); BucketSelectorPipelineAggregator.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java index 7d71af17b4..f9d21087d6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java @@ -50,7 +50,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator. import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY; public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovAvgPipelineAggregationBuilder> { - public static final String NAME = MovAvgPipelineAggregator.TYPE.name(); + public static final String NAME = "moving_avg"; public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME); public static final ParseField MODEL = new ParseField("model"); @@ -67,14 +67,14 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio private Boolean minimize; public MovAvgPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, MovAvgPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** * Read from a stream. */ public MovAvgPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, MovAvgPipelineAggregator.TYPE.name()); + super(in, NAME); format = in.readOptionalString(); gapPolicy = GapPolicy.readFrom(in); window = in.readVInt(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index 880b4e4e6a..216890741b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -26,13 +26,11 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.joda.time.DateTime; @@ -47,31 +45,12 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; public class MovAvgPipelineAggregator extends PipelineAggregator { - - public static final Type TYPE = new Type("moving_avg"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public MovAvgPipelineAggregator readResult(StreamInput in) throws IOException { - MovAvgPipelineAggregator result = new MovAvgPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - - private DocValueFormat formatter; - private GapPolicy gapPolicy; - private int window; + private final DocValueFormat formatter; + private final GapPolicy gapPolicy; + private final int window; private MovAvgModel model; - private int predict; - private boolean minimize; - - public MovAvgPipelineAggregator() { - } + private final int predict; + private final boolean minimize; public MovAvgPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy, int window, int predict, MovAvgModel model, boolean minimize, Map<String, Object> metadata) { @@ -84,9 +63,32 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { this.minimize = minimize; } + /** + * Read from a stream. + */ + public MovAvgPipelineAggregator(StreamInput in) throws IOException { + super(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + window = in.readVInt(); + predict = in.readVInt(); + model = in.readNamedWriteable(MovAvgModel.class); + minimize = in.readBoolean(); + } + @Override - public Type type() { - return TYPE; + public void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(formatter); + gapPolicy.writeTo(out); + out.writeVInt(window); + out.writeVInt(predict); + out.writeNamedWriteable(model); + out.writeBoolean(minimize); + } + + @Override + public String getWriteableName() { + return MovAvgPipelineAggregationBuilder.NAME; } @Override @@ -246,26 +248,4 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { return SimulatedAnealingMinimizer.minimize(model, values, test); } - - @Override - public void doReadFrom(StreamInput in) throws IOException { - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - window = in.readVInt(); - predict = in.readVInt(); - model = in.readNamedWriteable(MovAvgModel.class); - minimize = in.readBoolean(); - - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeVInt(window); - out.writeVInt(predict); - out.writeNamedWriteable(model); - out.writeBoolean(minimize); - - } } |