summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNik Everett <nik9000@gmail.com>2016-07-13 12:21:51 -0400
committerNik Everett <nik9000@gmail.com>2016-07-18 10:14:09 -0400
commit16812cc0321e56466a3d23bc6c1521d0cdd9ce77 (patch)
tree6cee6e68d164ac1d4824b8cd7cecbd2edd685361 /core
parent83945445488630c7bcd33de9105045a9c6a77307 (diff)
Migrate moving_avg pipeline aggregation to NamedWriteable
This is the first pipeline aggregation that doesn't have its own bucket type that needs serializing. It uses InternalHistogram instead. So that required reworking the new-style `registerAggregation` method to not require bucket readers. So I built `PipelineAggregationSpec` to mirror `AggregationSpec`. It allows registering any number of bucket readers or result readers.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java142
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java6
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java80
3 files changed, 136 insertions, 92 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index baf7276313..719b1ab110 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -98,7 +98,6 @@ import org.elasticsearch.plugins.SearchPlugin.SearchPluginSpec;
import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.Aggregator.Parser;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@@ -381,8 +380,10 @@ public class SearchModule extends AbstractModule {
* Register an aggregation.
*/
public void registerAggregation(AggregationSpec spec) {
- namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader);
- aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName);
+ if (false == transportClient) {
+ aggregationParserRegistry.register(spec.parser, spec.name);
+ }
+ namedWriteableRegistry.register(AggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.resultReaders.entrySet()) {
String writeableName = t.getKey();
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
@@ -393,29 +394,30 @@ public class SearchModule extends AbstractModule {
public static class AggregationSpec {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
private final Writeable.Reader<? extends AggregationBuilder> builderReader;
- private final Aggregator.Parser aggregationParser;
- private final ParseField aggregationName;
+ private final Aggregator.Parser parser;
+ private final ParseField name;
/**
* Register an aggregation.
*
* @param builderReader reads the {@link AggregationBuilder} from a stream
- * @param aggregationParser reads the aggregation builder from XContent
- * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the
- * reader is registered under.
+ * @param parser reads the aggregation builder from XContent
+ * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
+ * registered under.
*/
- public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Parser aggregationParser, ParseField aggregationName) {
+ public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Aggregator.Parser parser,
+ ParseField name) {
this.builderReader = builderReader;
- this.aggregationParser = aggregationParser;
- this.aggregationName = aggregationName;
+ this.parser = parser;
+ this.name = name;
}
/**
- * Add a reader for the shard level results of the aggregation with {@linkplain #aggregationName}'s
- * {@link ParseField#getPreferredName()} as the {@link NamedWriteable#getWriteableName()}.
+ * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
+ * the {@link NamedWriteable#getWriteableName()}.
*/
public AggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
- return addResultReader(aggregationName.getPreferredName(), resultReader);
+ return addResultReader(name.getPreferredName(), resultReader);
}
/**
@@ -429,23 +431,73 @@ public class SearchModule extends AbstractModule {
/**
* Register a pipeline aggregation.
- *
- * @param reader reads the aggregation builder from a stream
- * @param internalReader reads the {@link PipelineAggregator} from a stream
- * @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
- * @param aggregationParser reads the aggregation builder from XContent
- * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
- * is registered under.
*/
- public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
- Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader,
- PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
+ public void registerPipelineAggregation(PipelineAggregationSpec spec) {
if (false == transportClient) {
- pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
+ pipelineAggregationParserRegistry.register(spec.parser, spec.name);
+ }
+ namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
+ for (Map.Entry<String, Writeable.Reader<? extends PipelineAggregator>> resultReader : spec.resultReaders.entrySet()) {
+ namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue());
+ }
+ for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders : spec.bucketReaders.entrySet()) {
+ namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue());
+ }
+ }
+
+ public static class PipelineAggregationSpec {
+ private final Map<String, Writeable.Reader<? extends PipelineAggregator>> resultReaders = new TreeMap<>();
+ private final Map<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders = new TreeMap<>();
+ private final Writeable.Reader<? extends PipelineAggregationBuilder> builderReader;
+ private final PipelineAggregator.Parser parser;
+ private final ParseField name;
+
+ /**
+ * Register a pipeline aggregation.
+ *
+ * @param builderReader reads the {@link PipelineAggregationBuilder} from a stream
+ * @param parser reads the aggregation builder from XContent
+ * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
+ * registered under.
+ */
+ public PipelineAggregationSpec(Reader<? extends PipelineAggregationBuilder> builderReader,
+ PipelineAggregator.Parser parser, ParseField name) {
+ this.builderReader = builderReader;
+ this.parser = parser;
+ this.name = name;
+ }
+
+ /**
+ * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
+ * the {@link NamedWriteable#getWriteableName()}.
+ */
+ public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends PipelineAggregator> resultReader) {
+ return addResultReader(name.getPreferredName(), resultReader);
+ }
+
+ /**
+ * Add a reader for the shard level results of the aggregation.
+ */
+ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends PipelineAggregator> resultReader) {
+ resultReaders.put(writeableName, resultReader);
+ return this;
+ }
+
+ /**
+ * Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()}
+ * as the {@link NamedWriteable#getWriteableName()}.
+ */
+ public PipelineAggregationSpec addBucketReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
+ return addBucketReader(name.getPreferredName(), resultReader);
+ }
+
+ /**
+ * Add a reader for the shard level results of the aggregation.
+ */
+ public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
+ bucketReaders.put(writeableName, resultReader);
+ return this;
}
- namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
- namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader);
- namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader);
}
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
@@ -552,8 +604,12 @@ public class SearchModule extends AbstractModule {
registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new));
- registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new,
- DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ DerivativePipelineAggregationBuilder::new,
+ DerivativePipelineAggregationBuilder::parse,
+ DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+ .addResultReader(DerivativePipelineAggregator::new)
+ .addBucketReader(InternalDerivative::new));
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER,
@@ -562,17 +618,26 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER,
SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new,
- InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER,
- StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new,
- InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(),
- ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ StatsBucketPipelineAggregationBuilder::new,
+ StatsBucketPipelineAggregationBuilder.PARSER,
+ StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+ .addResultReader(StatsBucketPipelineAggregator::new)
+ .addBucketReader(InternalStatsBucket::new));
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ ExtendedStatsBucketPipelineAggregationBuilder::new,
+ new ExtendedStatsBucketParser(),
+ ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+ .addResultReader(ExtendedStatsBucketPipelineAggregator::new)
+ .addBucketReader(InternalExtendedStatsBucket::new));
registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER,
PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(MovAvgPipelineAggregationBuilder::new,
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ MovAvgPipelineAggregationBuilder::new,
(n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c),
- MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME);
+ MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME)
+ .addResultReader(MovAvgPipelineAggregator::new)
+ /* Uses InternalHistogram for buckets */);
registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse,
CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse,
@@ -821,7 +886,6 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
- MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
index 7d71af17b4..f9d21087d6 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
@@ -50,7 +50,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovAvgPipelineAggregationBuilder> {
- public static final String NAME = MovAvgPipelineAggregator.TYPE.name();
+ public static final String NAME = "moving_avg";
public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME);
public static final ParseField MODEL = new ParseField("model");
@@ -67,14 +67,14 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio
private Boolean minimize;
public MovAvgPipelineAggregationBuilder(String name, String bucketsPath) {
- super(name, MovAvgPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+ super(name, NAME, new String[] { bucketsPath });
}
/**
* Read from a stream.
*/
public MovAvgPipelineAggregationBuilder(StreamInput in) throws IOException {
- super(in, MovAvgPipelineAggregator.TYPE.name());
+ super(in, NAME);
format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
index 880b4e4e6a..216890741b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
@@ -26,13 +26,11 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.joda.time.DateTime;
@@ -47,31 +45,12 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class MovAvgPipelineAggregator extends PipelineAggregator {
-
- public static final Type TYPE = new Type("moving_avg");
-
- public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
- @Override
- public MovAvgPipelineAggregator readResult(StreamInput in) throws IOException {
- MovAvgPipelineAggregator result = new MovAvgPipelineAggregator();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
- }
-
- private DocValueFormat formatter;
- private GapPolicy gapPolicy;
- private int window;
+ private final DocValueFormat formatter;
+ private final GapPolicy gapPolicy;
+ private final int window;
private MovAvgModel model;
- private int predict;
- private boolean minimize;
-
- public MovAvgPipelineAggregator() {
- }
+ private final int predict;
+ private final boolean minimize;
public MovAvgPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy,
int window, int predict, MovAvgModel model, boolean minimize, Map<String, Object> metadata) {
@@ -84,9 +63,32 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
this.minimize = minimize;
}
+ /**
+ * Read from a stream.
+ */
+ public MovAvgPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ formatter = in.readNamedWriteable(DocValueFormat.class);
+ gapPolicy = GapPolicy.readFrom(in);
+ window = in.readVInt();
+ predict = in.readVInt();
+ model = in.readNamedWriteable(MovAvgModel.class);
+ minimize = in.readBoolean();
+ }
+
@Override
- public Type type() {
- return TYPE;
+ public void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(formatter);
+ gapPolicy.writeTo(out);
+ out.writeVInt(window);
+ out.writeVInt(predict);
+ out.writeNamedWriteable(model);
+ out.writeBoolean(minimize);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return MovAvgPipelineAggregationBuilder.NAME;
}
@Override
@@ -246,26 +248,4 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
return SimulatedAnealingMinimizer.minimize(model, values, test);
}
-
- @Override
- public void doReadFrom(StreamInput in) throws IOException {
- formatter = in.readNamedWriteable(DocValueFormat.class);
- gapPolicy = GapPolicy.readFrom(in);
- window = in.readVInt();
- predict = in.readVInt();
- model = in.readNamedWriteable(MovAvgModel.class);
- minimize = in.readBoolean();
-
- }
-
- @Override
- public void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(formatter);
- gapPolicy.writeTo(out);
- out.writeVInt(window);
- out.writeVInt(predict);
- out.writeNamedWriteable(model);
- out.writeBoolean(minimize);
-
- }
}