summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/search/aggregations/pipeline
diff options
context:
space:
mode:
authorNik Everett <nik9000@gmail.com>2016-07-13 12:21:51 -0400
committerNik Everett <nik9000@gmail.com>2016-07-18 10:14:09 -0400
commit16812cc0321e56466a3d23bc6c1521d0cdd9ce77 (patch)
tree6cee6e68d164ac1d4824b8cd7cecbd2edd685361 /core/src/main/java/org/elasticsearch/search/aggregations/pipeline
parent83945445488630c7bcd33de9105045a9c6a77307 (diff)
Migrate moving_avg pipeline aggregation to NamedWriteable
This is the first pipeline aggregation that doesn't have its own bucket type that needs serializing. It uses InternalHistogram instead. So that required reworking the new-style `registerAggregation` method to not require bucket readers. So I built `PipelineAggregationSpec` to mirror `AggregationSpec`. It allows registering any number of bucket readers or result readers.
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search/aggregations/pipeline')
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java6
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java80
2 files changed, 33 insertions, 53 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
index 7d71af17b4..f9d21087d6 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java
@@ -50,7 +50,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovAvgPipelineAggregationBuilder> {
- public static final String NAME = MovAvgPipelineAggregator.TYPE.name();
+ public static final String NAME = "moving_avg";
public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME);
public static final ParseField MODEL = new ParseField("model");
@@ -67,14 +67,14 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio
private Boolean minimize;
public MovAvgPipelineAggregationBuilder(String name, String bucketsPath) {
- super(name, MovAvgPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+ super(name, NAME, new String[] { bucketsPath });
}
/**
* Read from a stream.
*/
public MovAvgPipelineAggregationBuilder(StreamInput in) throws IOException {
- super(in, MovAvgPipelineAggregator.TYPE.name());
+ super(in, NAME);
format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
index 880b4e4e6a..216890741b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java
@@ -26,13 +26,11 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.joda.time.DateTime;
@@ -47,31 +45,12 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class MovAvgPipelineAggregator extends PipelineAggregator {
-
- public static final Type TYPE = new Type("moving_avg");
-
- public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
- @Override
- public MovAvgPipelineAggregator readResult(StreamInput in) throws IOException {
- MovAvgPipelineAggregator result = new MovAvgPipelineAggregator();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
- }
-
- private DocValueFormat formatter;
- private GapPolicy gapPolicy;
- private int window;
+ private final DocValueFormat formatter;
+ private final GapPolicy gapPolicy;
+ private final int window;
private MovAvgModel model;
- private int predict;
- private boolean minimize;
-
- public MovAvgPipelineAggregator() {
- }
+ private final int predict;
+ private final boolean minimize;
public MovAvgPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy,
int window, int predict, MovAvgModel model, boolean minimize, Map<String, Object> metadata) {
@@ -84,9 +63,32 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
this.minimize = minimize;
}
+ /**
+ * Read from a stream.
+ */
+ public MovAvgPipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ formatter = in.readNamedWriteable(DocValueFormat.class);
+ gapPolicy = GapPolicy.readFrom(in);
+ window = in.readVInt();
+ predict = in.readVInt();
+ model = in.readNamedWriteable(MovAvgModel.class);
+ minimize = in.readBoolean();
+ }
+
@Override
- public Type type() {
- return TYPE;
+ public void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(formatter);
+ gapPolicy.writeTo(out);
+ out.writeVInt(window);
+ out.writeVInt(predict);
+ out.writeNamedWriteable(model);
+ out.writeBoolean(minimize);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return MovAvgPipelineAggregationBuilder.NAME;
}
@Override
@@ -246,26 +248,4 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
return SimulatedAnealingMinimizer.minimize(model, values, test);
}
-
- @Override
- public void doReadFrom(StreamInput in) throws IOException {
- formatter = in.readNamedWriteable(DocValueFormat.class);
- gapPolicy = GapPolicy.readFrom(in);
- window = in.readVInt();
- predict = in.readVInt();
- model = in.readNamedWriteable(MovAvgModel.class);
- minimize = in.readBoolean();
-
- }
-
- @Override
- public void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(formatter);
- gapPolicy.writeTo(out);
- out.writeVInt(window);
- out.writeVInt(predict);
- out.writeNamedWriteable(model);
- out.writeBoolean(minimize);
-
- }
}