summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java142
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java6
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java80
3 files changed, 136 insertions, 92 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index baf7276313..719b1ab110 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -98,7 +98,6 @@ import org.elasticsearch.plugins.SearchPlugin.SearchPluginSpec;
import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.Aggregator.Parser;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@@ -381,8 +380,10 @@ public class SearchModule extends AbstractModule {
* Register an aggregation.
*/
public void registerAggregation(AggregationSpec spec) {
- namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader);
- aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName);
+ if (false == transportClient) {
+ aggregationParserRegistry.register(spec.parser, spec.name);
+ }
+ namedWriteableRegistry.register(AggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.resultReaders.entrySet()) {
String writeableName = t.getKey();
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
@@ -393,29 +394,30 @@ public class SearchModule extends AbstractModule {
public static class AggregationSpec {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
private final Writeable.Reader<? extends AggregationBuilder> builderReader;
- private final Aggregator.Parser aggregationParser;
- private final ParseField aggregationName;
+ private final Aggregator.Parser parser;
+ private final ParseField name;
/**
* Register an aggregation.
*
* @param builderReader reads the {@link AggregationBuilder} from a stream
- * @param aggregationParser reads the aggregation builder from XContent
- * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the
- * reader is registered under.
+ * @param parser reads the aggregation builder from XContent
+ * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
+ * registered under.
*/
- public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Parser aggregationParser, ParseField aggregationName) {
+ public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Aggregator.Parser parser,
+ ParseField name) {
this.builderReader = builderReader;
- this.aggregationParser = aggregationParser;
- this.aggregationName = aggregationName;
+ this.parser = parser;
+ this.name = name;
}
/**
- * Add a reader for the shard level results of the aggregation with {@linkplain #aggregationName}'s
- * {@link ParseField#getPreferredName()} as the {@link NamedWriteable#getWriteableName()}.
+ * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
+ * the {@link NamedWriteable#getWriteableName()}.
*/
public AggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
- return addResultReader(aggregationName.getPreferredName(), resultReader);
+ return addResultReader(name.getPreferredName(), resultReader);
}
/**
@@ -429,23 +431,73 @@ public class SearchModule extends AbstractModule {
/**
* Register a pipeline aggregation.
- *
- * @param reader reads the aggregation builder from a stream
- * @param internalReader reads the {@link PipelineAggregator} from a stream
- * @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
- * @param aggregationParser reads the aggregation builder from XContent
- * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
- * is registered under.
*/
- public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
- Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader,
- PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
+ public void registerPipelineAggregation(PipelineAggregationSpec spec) {
if (false == transportClient) {
- pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
+ pipelineAggregationParserRegistry.register(spec.parser, spec.name);
+ }
+ namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
+ for (Map.Entry<String, Writeable.Reader<? extends PipelineAggregator>> resultReader : spec.resultReaders.entrySet()) {
+ namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue());
+ }
+ for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders : spec.bucketReaders.entrySet()) {
+ namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue());
+ }
+ }
+
+ public static class PipelineAggregationSpec {
+ private final Map<String, Writeable.Reader<? extends PipelineAggregator>> resultReaders = new TreeMap<>();
+ private final Map<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders = new TreeMap<>();
+ private final Writeable.Reader<? extends PipelineAggregationBuilder> builderReader;
+ private final PipelineAggregator.Parser parser;
+ private final ParseField name;
+
+ /**
+ * Register a pipeline aggregation.
+ *
+ * @param builderReader reads the {@link PipelineAggregationBuilder} from a stream
+ * @param parser reads the aggregation builder from XContent
+ * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
+ * registered under.
+ */
+ public PipelineAggregationSpec(Reader<? extends PipelineAggregationBuilder> builderReader,
+ PipelineAggregator.Parser parser, ParseField name) {
+ this.builderReader = builderReader;
+ this.parser = parser;
+ this.name = name;
+ }
+
+ /**
+ * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
+ * the {@link NamedWriteable#getWriteableName()}.
+ */
+ public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends PipelineAggregator> resultReader) {
+ return addResultReader(name.getPreferredName(), resultReader);
+ }
+
+ /**
+ * Add a reader for the shard level results of the aggregation.
+ */
+ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends PipelineAggregator> resultReader) {
+ resultReaders.put(writeableName, resultReader);
+ return this;
+ }
+
+ /**
+ * Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()}
+ * as the {@link NamedWriteable#getWriteableName()}.
+ */
+ public PipelineAggregationSpec addBucketReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
+ return addBucketReader(name.getPreferredName(), resultReader);
+ }
+
+ /**
+ * Add a reader for the shard level results of the aggregation.
+ */
+ public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
+ bucketReaders.put(writeableName, resultReader);
+ return this;
}
- namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
- namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader);
- namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader);
}
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
@@ -552,8 +604,12 @@ public class SearchModule extends AbstractModule {
registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new));
- registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new,
- DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ DerivativePipelineAggregationBuilder::new,
+ DerivativePipelineAggregationBuilder::parse,
+ DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+ .addResultReader(DerivativePipelineAggregator::new)
+ .addBucketReader(InternalDerivative::new));
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER,
@@ -562,17 +618,26 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER,
SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new,
- InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER,
- StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new,
- InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(),
- ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ StatsBucketPipelineAggregationBuilder::new,
+ StatsBucketPipelineAggregationBuilder.PARSER,
+ StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+ .addResultReader(StatsBucketPipelineAggregator::new)
+ .addBucketReader(InternalStatsBucket::new));
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ ExtendedStatsBucketPipelineAggregationBuilder::new,
+ new ExtendedStatsBucketParser(),
+ ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+ .addResultReader(ExtendedStatsBucketPipelineAggregator::new)
+ .addBucketReader(InternalExtendedStatsBucket::new));
registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER,
PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
- registerPipelineAggregation(MovAvgPipelineAggregationBuilder::new,
+ registerPipelineAggregation(new PipelineAggregationSpec(
+ MovAvgPipelineAggregationBuilder::new,
(n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c),
- MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME);
+ MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME)
+ .addResultReader(MovAvgPipelineAggregator::new)
+ /* Uses InternalHistogram for buckets */);
registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse,
CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse,
@@ -821,7 +886,6 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
- MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
index 7d71af17b4..f9d21087d6 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
@@ -50,7 +50,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovAvgPipelineAggregationBuilder> {
- public static final String NAME = MovAvgPipelineAggregator.TYPE.name();
+ public static final String NAME = "moving_avg";
public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME);
public static final ParseField MODEL = new ParseField("model");
@@ -67,14 +67,14 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio
private Boolean minimize;
public MovAvgPipelineAggregationBuilder(String name, String bucketsPath) {
- super(name, MovAvgPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+ super(name, NAME, new String[] { bucketsPath });
}
/**
* Read from a stream.
*/
public MovAvgPipelineAggregationBuilder(StreamInput in) throws IOException {
- super(in, MovAvgPipelineAggregator.TYPE.name());
+ super(in, NAME);
format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
index 880b4e4e6a..216890741b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
@@ -26,13 +26,11 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.joda.time.DateTime;
@@ -47,31 +45,12 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class MovAvgPipelineAggregator extends PipelineAggregator {
-
- public static final Type TYPE = new Type("moving_avg");
-
- public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
- @Override
- public MovAvgPipelineAggregator readResult(StreamInput in) throws IOException {
- MovAvgPipelineAggregator result = new MovAvgPipelineAggregator();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
- }
-
- private DocValueFormat formatter;
- private GapPolicy gapPolicy;
- private int window;
+ private final DocValueFormat formatter;
+ private final GapPolicy gapPolicy;
+ private final int window;
private MovAvgModel model;
- private int predict;
- private boolean minimize;
-
- public MovAvgPipelineAggregator() {
- }
+ private final int predict;
+ private final boolean minimize;
public MovAvgPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy,
int window, int predict, MovAvgModel model, boolean minimize, Map<String, Object> metadata) {
@@ -84,9 +63,32 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
this.minimize = minimize;
}
+ /**
+ * Read from a stream.
+ */
+ public MovAvgPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ formatter = in.readNamedWriteable(DocValueFormat.class);
+ gapPolicy = GapPolicy.readFrom(in);
+ window = in.readVInt();
+ predict = in.readVInt();
+ model = in.readNamedWriteable(MovAvgModel.class);
+ minimize = in.readBoolean();
+ }
+
@Override
- public Type type() {
- return TYPE;
+ public void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(formatter);
+ gapPolicy.writeTo(out);
+ out.writeVInt(window);
+ out.writeVInt(predict);
+ out.writeNamedWriteable(model);
+ out.writeBoolean(minimize);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return MovAvgPipelineAggregationBuilder.NAME;
}
@Override
@@ -246,26 +248,4 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
return SimulatedAnealingMinimizer.minimize(model, values, test);
}
-
- @Override
- public void doReadFrom(StreamInput in) throws IOException {
- formatter = in.readNamedWriteable(DocValueFormat.class);
- gapPolicy = GapPolicy.readFrom(in);
- window = in.readVInt();
- predict = in.readVInt();
- model = in.readNamedWriteable(MovAvgModel.class);
- minimize = in.readBoolean();
-
- }
-
- @Override
- public void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(formatter);
- gapPolicy.writeTo(out);
- out.writeVInt(window);
- out.writeVInt(predict);
- out.writeNamedWriteable(model);
- out.writeBoolean(minimize);
-
- }
}