summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java12
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java10
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java8
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java69
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java117
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java33
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java8
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java10
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java2
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java5
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java6
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java5
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java10
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java3
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java8
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java1
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java40
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java69
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java7
-rw-r--r--core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java41
-rw-r--r--core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java22
-rw-r--r--modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java1
24 files changed, 68 insertions, 427 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
index 0f6ab65db1..4c5f9757ca 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
@@ -836,6 +836,18 @@ public abstract class StreamInput extends InputStream {
return builder;
}
+ /**
+ * Reads a list of {@link NamedWriteable}s.
+ */
+ public <T extends NamedWriteable> List<T> readNamedWriteableList(Class<T> categoryClass) throws IOException {
+ int count = readVInt();
+ List<T> builder = new ArrayList<>(count);
+ for (int i=0; i<count; i++) {
+ builder.add(readNamedWriteable(categoryClass));
+ }
+ return builder;
+ }
+
public static StreamInput wrap(byte[] bytes) {
return wrap(bytes, 0, bytes.length);
}
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index 3ee6e94a4a..24350936fa 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -835,4 +835,14 @@ public abstract class StreamOutput extends OutputStream {
obj.writeTo(this);
}
}
+
+ /**
+ * Writes a list of {@link NamedWriteable} objects.
+ */
+ public void writeNamedWriteableList(List<? extends NamedWriteable> list) throws IOException {
+ writeVInt(list.size());
+ for (NamedWriteable obj: list) {
+ writeNamedWriteable(obj);
+ }
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index e8ddc5298a..04c77d9b1e 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -486,14 +486,6 @@ public class SearchModule extends AbstractModule {
}
}
- public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
- PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
- // NORELEASE remove me in favor of the above method
- pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
- namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
- }
-
-
@Override
protected void configure() {
if (false == transportClient) {
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java
index f3ad47910e..5dc2937431 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java
@@ -154,8 +154,8 @@ public class AggregationPhase implements SearchPhase {
siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
} else {
throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name()
- + "] of type [" + pipelineAggregator.type().name()
- + "]. Only sibling pipeline aggregations are allowed at the top level");
+ + "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are "
+ + "allowed at the top level");
}
}
context.queryResult().pipelineAggregators(siblingPipelineAggregators);
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java
deleted file mode 100644
index 81228be260..0000000000
--- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.search.aggregations;
-
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.StreamInput;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.unmodifiableMap;
-
-/**
- * A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that
- * know how to stream themselves.
- */
-public class AggregationStreams {
- private static Map<BytesReference, Stream> streams = emptyMap();
-
- /**
- * A stream that knows how to read an aggregation from the input.
- */
- public interface Stream {
- InternalAggregation readResult(StreamInput in) throws IOException;
- }
-
- /**
- * Registers the given stream and associate it with the given types.
- *
- * @param stream The streams to register
- * @param types The types associated with the streams
- */
- public static synchronized void registerStream(Stream stream, BytesReference... types) {
- Map<BytesReference, Stream> newStreams = new HashMap<>(streams);
- for (BytesReference type : types) {
- newStreams.put(type, stream);
- }
- streams = unmodifiableMap(newStreams);
- }
-
- /**
- * Returns the stream that is registered for the given type
- *
- * @param type The given type
- * @return The associated stream
- */
- public static Stream stream(BytesReference type) {
- return streams.get(type);
- }
-
-}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
index 52a0ad1b17..b7635d3dc3 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
@@ -20,55 +20,35 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.ParseField;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
*/
-public abstract class InternalAggregation implements Aggregation, ToXContent, Streamable, NamedWriteable {
- // NORELEASE remove Streamable
-
+public abstract class InternalAggregation implements Aggregation, ToXContent, NamedWriteable {
/**
* The aggregation type that holds all the string types that are associated with an aggregation:
* <ul>
* <li>name - used as the parser type</li>
- * <li>stream - used as the stream type</li>
* </ul>
*/
public static class Type {
-
- private String name;
- private BytesReference stream;
+ private final String name;
public Type(String name) {
- this(name, new BytesArray(name));
- }
-
- public Type(String name, String stream) {
- this(name, new BytesArray(stream));
- }
-
- public Type(String name, BytesReference stream) {
this.name = name;
- this.stream = stream;
}
/**
@@ -79,14 +59,6 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
return name;
}
- /**
- * @return The name of the stream type (used for registering the aggregation stream
- * (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, BytesReference...)}).
- */
- public BytesReference stream() {
- return stream;
- }
-
@Override
public String toString() {
return name;
@@ -118,15 +90,11 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
}
}
+ protected final String name;
- protected String name;
-
- protected Map<String, Object> metaData;
-
- private List<PipelineAggregator> pipelineAggregators;
+ protected final Map<String, Object> metaData;
- /** Constructs an un initialized addAggregation (used for serialization) **/
- protected InternalAggregation() {} // NORELEASE remove when removing Streamable
+ private final List<PipelineAggregator> pipelineAggregators;
/**
* Constructs an get with a given name.
@@ -145,82 +113,19 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
- int size = in.readVInt();
- if (size == 0) {
- pipelineAggregators = Collections.emptyList();
- } else {
- pipelineAggregators = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- if (in.readBoolean()) {
- pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
- } else {
- BytesReference type = in.readBytesReference();
- PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
- pipelineAggregators.add(pipelineAggregator);
- }
- }
- }
- }
-
- @Override
- public final void readFrom(StreamInput in) throws IOException {
- try {
- getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
- assert false : "Used reading constructor instead";
- } catch (UnsupportedOperationException e) {
- // OK
- }
- name = in.readString();
- metaData = in.readMap();
- int size = in.readVInt();
- if (size == 0) {
- pipelineAggregators = Collections.emptyList();
- } else {
- pipelineAggregators = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- if (in.readBoolean()) {
- pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
- } else {
- BytesReference type = in.readBytesReference();
- PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
- pipelineAggregators.add(pipelineAggregator);
- }
- }
- }
- doReadFrom(in);
- }
-
- protected void doReadFrom(StreamInput in) throws IOException {
- throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
+ pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metaData);
- out.writeVInt(pipelineAggregators.size());
- for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
- // NORELEASE temporary hack to support old style streams and new style NamedWriteable
- try {
- pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
- out.writeBoolean(true);
- out.writeNamedWriteable(pipelineAggregator);
- } catch (UnsupportedOperationException e) {
- out.writeBoolean(false);
- out.writeBytesReference(pipelineAggregator.type().stream());
- pipelineAggregator.writeTo(out);
- }
- }
+ out.writeNamedWriteableList(pipelineAggregators);
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
- @Override
- public String getWriteableName() {
- // NORELEASE remove me when all InternalAggregations override it
- throw new UnsupportedOperationException("Override on every class");
- }
@Override
public String getName() {
@@ -228,14 +133,6 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
}
/**
- * @return The {@link Type} of this aggregation
- */
- public Type type() {
- // NORELEASE remove this method
- throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
- }
-
- /**
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
* try reusing an existing instance (typically the first in the given list) to save on redundant object
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java
index 3628133580..66e45156ca 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java
@@ -18,7 +18,6 @@
*/
package org.elasticsearch.search.aggregations;
-import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@@ -198,41 +197,15 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
@Override
public void readFrom(StreamInput in) throws IOException {
- int size = in.readVInt();
- if (size == 0) {
- aggregations = Collections.emptyList();
+ aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class));
+ if (aggregations.isEmpty()) {
aggregationsAsMap = emptyMap();
- } else {
- aggregations = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
- if (in.readBoolean()) {
- aggregations.add(in.readNamedWriteable(InternalAggregation.class));
- } else {
- BytesReference type = in.readBytesReference();
- InternalAggregation aggregation = AggregationStreams.stream(type).readResult(in);
- aggregations.add(aggregation);
- }
- }
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(aggregations.size());
- for (Aggregation aggregation : aggregations) {
- InternalAggregation internal = (InternalAggregation) aggregation;
- // NORELEASE Temporary hack to support old style streams and new style NamedWriteable at the same time
- try {
- internal.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
- out.writeBoolean(true);
- out.writeNamedWriteable(internal);
- } catch (UnsupportedOperationException e) {
- out.writeBoolean(false);
- out.writeBytesReference(internal.type().stream());
- internal.writeTo(out);
- }
- }
+ out.writeNamedWriteableList(aggregations);
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java
index 46867d24f4..f1e8b7358c 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java
@@ -29,10 +29,6 @@ import java.util.Map;
public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation, B extends InternalMultiBucketAggregation.InternalBucket>
extends InternalAggregation implements MultiBucketsAggregation {
-
- public InternalMultiBucketAggregation() {
- }
-
public InternalMultiBucketAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java
index 9b88ab4705..e8b0468006 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java
@@ -39,8 +39,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
private long docCount;
private InternalAggregations aggregations;
- protected InternalSingleBucketAggregation() {} // for serialization
-
/**
* Creates a single bucket aggregation.
*
@@ -64,12 +62,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
}
@Override
- protected void doReadFrom(StreamInput in) throws IOException {
- docCount = in.readVLong();
- aggregations = InternalAggregations.readAggregations(in);
- }
-
- @Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVLong(docCount);
aggregations.writeTo(out);
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java
index 39603c9ab4..2d8d26dd35 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java
@@ -40,9 +40,7 @@ public interface MultiBucketsAggregation extends Aggregation {
* A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
*/
- public interface Bucket extends HasAggregations, ToXContent, Streamable, Writeable {
- // NORELEASE remove Streamable
-
+ public interface Bucket extends HasAggregations, ToXContent, Writeable {
/**
* @return The key associated with the bucket
*/
@@ -66,12 +64,6 @@ public interface MultiBucketsAggregation extends Aggregation {
Object getProperty(String containingAggName, List<String> path);
- @Override
- default void readFrom(StreamInput in) throws IOException {
- // NORELEASE remove me when no Buckets override it
- throw new UnsupportedOperationException("Prefer the Writeable interface");
- }
-
static class SubAggregationComparator<B extends Bucket> implements java.util.Comparator<B> {
private final AggregationPath path;
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
index bb0acd76be..0dfc9c8e53 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
@@ -35,7 +35,7 @@ import java.io.IOException;
public class InternalDateHistogram {
public static final Factory HISTOGRAM_FACTORY = new Factory();
- static final Type TYPE = new Type("date_histogram", "dhisto");
+ static final Type TYPE = new Type("date_histogram");
static class Bucket extends InternalHistogram.Bucket {
Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, DocValueFormat formatter,
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
index 8891f75d4b..29ae9a1016 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
@@ -303,11 +303,6 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
@Override
- public Type type() {
- return TYPE;
- }
-
- @Override
public List<B> getBuckets() {
return buckets;
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java
index e7e7533088..54ee27bfa9 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java
@@ -173,13 +173,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
}
@Override
- public void readFrom(StreamInput in) throws IOException {
-
- }
-
- @Override
public void writeTo(StreamOutput out) throws IOException {
-
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
index c4ade93e60..00a066c11b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
@@ -242,11 +242,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
- public void readFrom(StreamInput in) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
protected void writeTermTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
index 56d69d814f..a8b4c44ce4 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
@@ -196,13 +196,9 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
!terms.getClass().equals(UnmappedTerms.class)) {
// control gets into this loop when the same field name against which the query is executed
// is of different types in different indices.
- throw new AggregationExecutionException("Merging/Reducing the aggregations failed " +
- "when computing the aggregation [ Name: " +
- referenceTerms.getName() + ", Type: " +
- referenceTerms.type() + " ]" + " because: " +
- "the field you gave in the aggregation query " +
- "existed as two different types " +
- "in two different indices");
+ throw new AggregationExecutionException("Merging/Reducing the aggregations failed when computing the aggregation ["
+ + referenceTerms.getName() + "] because the field you gave in the aggregation query existed as two different "
+ + "types in two different indices");
}
otherDocCount += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError;
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java
index a5c64c5cde..ded69d9f75 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java
@@ -28,9 +28,6 @@ import java.util.List;
import java.util.Map;
public abstract class InternalMetricsAggregation extends InternalAggregation {
-
- protected InternalMetricsAggregation() {} // NORELEASE remove when we remove streamable
-
protected InternalMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java
index 3677150097..901c52a232 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java
@@ -36,9 +36,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
protected DocValueFormat format = DEFAULT_FORMAT;
public abstract static class SingleValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.SingleValue {
-
- protected SingleValue() {} // NORELEASE remove when we remove Streamable
-
protected SingleValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
@@ -69,9 +66,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
}
public abstract static class MultiValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.MultiValue {
-
- protected MultiValue() {}
-
protected MultiValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
@@ -101,8 +95,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
}
}
- private InternalNumericMetricsAggregation() {} // NORELEASE remove when we remove Streamable
-
private InternalNumericMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java
index 7bfd54eb64..02953abc2d 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java
@@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java
index 446b56d07e..f49ca1dd39 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java
@@ -24,19 +24,15 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import java.io.IOException;
import java.util.Map;
-public abstract class PipelineAggregator implements Streamable, NamedWriteable {
- // NORELEASE remove Streamable
-
+public abstract class PipelineAggregator implements NamedWriteable {
/**
* Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}.
*/
@@ -66,9 +62,6 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
private String[] bucketsPaths;
private Map<String, Object> metaData;
- protected PipelineAggregator() { // for Serialisation
- }
-
protected PipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) {
this.name = name;
this.bucketsPaths = bucketsPaths;
@@ -85,26 +78,8 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
}
@Override
- public final void readFrom(StreamInput in) throws IOException {
- try {
- getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
- assert false : "Used reading constructor instead";
- } catch (UnsupportedOperationException e) {
- // OK
- }
- name = in.readString();
- bucketsPaths = in.readStringArray();
- metaData = in.readMap();
- doReadFrom(in);
- }
-
- protected void doReadFrom(StreamInput in) throws IOException {
- throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
- }
-
- @Override
public final void writeTo(StreamOutput out) throws IOException {
- out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
+ out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metaData);
doWriteTo(out);
@@ -112,12 +87,6 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
protected abstract void doWriteTo(StreamOutput out) throws IOException;
- @Override
- public String getWriteableName() {
- // NORELEASE remove me when all InternalAggregations override it
- throw new UnsupportedOperationException("Override on every class");
- }
-
public String name() {
return name;
}
@@ -130,10 +99,5 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
return metaData;
}
- public Type type() {
- // NORELEASE remove this method
- throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
- }
-
public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java
deleted file mode 100644
index ad1dc73f6e..0000000000
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.search.aggregations.pipeline;
-
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.StreamInput;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.unmodifiableMap;
-
-/**
- * A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that
- * know how to stream themselves.
- */
-public class PipelineAggregatorStreams {
- private static Map<BytesReference, Stream> streams = emptyMap();
-
- /**
- * A stream that knows how to read an aggregation from the input.
- */
- public interface Stream {
- PipelineAggregator readResult(StreamInput in) throws IOException;
- }
-
- /**
- * Registers the given stream and associate it with the given types.
- *
- * @param stream The streams to register
- * @param types The types associated with the streams
- */
- public static synchronized void registerStream(Stream stream, BytesReference... types) {
- Map<BytesReference, Stream> newStreams = new HashMap<>(streams);
- for (BytesReference type : types) {
- newStreams.put(type, stream);
- }
- streams = unmodifiableMap(newStreams);
- }
-
- /**
- * Returns the stream that is registered for the given type
- *
- * @param type The given type
- * @return The associated stream
- */
- public static Stream stream(BytesReference type) {
- return streams.get(type);
- }
-
-}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
index 79ec761b0e..b78691455d 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
@@ -36,11 +36,6 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public abstract class SiblingPipelineAggregator extends PipelineAggregator {
-
- protected SiblingPipelineAggregator() { // NOCOMMIT remove me
- super();
- }
-
protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) {
super(name, bucketsPaths, metaData);
}
@@ -83,7 +78,7 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
return singleBucketAgg.create(new InternalAggregations(aggs));
} else {
throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation ["
- + aggregation.type().name() + "]");
+ + aggregation.getWriteableName() + "]");
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
index 772f91a525..5125950a41 100644
--- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
+++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
@@ -23,7 +23,6 @@ import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
@@ -31,15 +30,15 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
@@ -220,21 +219,8 @@ public class QuerySearchResult extends QuerySearchResultProvider {
if (in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in);
}
- if (in.readBoolean()) {
- int size = in.readVInt();
- List<SiblingPipelineAggregator> pipelineAggregators = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
- if (in.readBoolean()) {
- pipelineAggregators.add((SiblingPipelineAggregator) in.readNamedWriteable(PipelineAggregator.class));
- } else {
- BytesReference type = in.readBytesReference();
- PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
- pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
- }
- }
- this.pipelineAggregators = pipelineAggregators;
- }
+ pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
+ .collect(Collectors.toList());
if (in.readBoolean()) {
suggest = Suggest.readSuggest(in);
}
@@ -272,24 +258,7 @@ public class QuerySearchResult extends QuerySearchResultProvider {
out.writeBoolean(true);
aggregations.writeTo(out);
}
- if (pipelineAggregators == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeVInt(pipelineAggregators.size());
- for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
- // NORELEASE temporary hack to support old style streams and new style NamedWriteable
- try {
- pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
- out.writeBoolean(true);
- out.writeNamedWriteable(pipelineAggregator);
- } catch (UnsupportedOperationException e) {
- out.writeBoolean(false);
- out.writeBytesReference(pipelineAggregator.type().stream());
- pipelineAggregator.writeTo(out);
- }
- }
- }
+ out.writeNamedWriteableList(pipelineAggregators == null ? emptyList() : pipelineAggregators);
if (suggest == null) {
out.writeBoolean(false);
} else {
diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java
index 846c8dca37..dcd612198d 100644
--- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java
+++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java
@@ -337,8 +337,26 @@ public class BytesStreamsTests extends ESTestCase {
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry);
assertEquals(in.available(), bytes.length);
BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class);
- assertEquals(namedWriteableOut, namedWriteableIn);
- assertEquals(in.available(), 0);
+ assertEquals(namedWriteableIn, namedWriteableOut);
+ assertEquals(0, in.available());
+ }
+
+ public void testNamedWriteableList() throws IOException {
+ NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
+ namedWriteableRegistry.register(BaseNamedWriteable.class, TestNamedWriteable.NAME, TestNamedWriteable::new);
+ int size = between(0, 100);
+ List<BaseNamedWriteable> expected = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ expected.add(new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
+ }
+
+ try (BytesStreamOutput out = new BytesStreamOutput()) {
+ out.writeNamedWriteableList(expected);
+ try (StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)) {
+ assertEquals(expected, in.readNamedWriteableList(BaseNamedWriteable.class));
+ assertEquals(0, in.available());
+ }
+ }
}
public void testNamedWriteableDuplicates() throws IOException {
diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java
index 851e377fc8..d97fb64a16 100644
--- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java
+++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java
@@ -54,6 +54,7 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
* Read from a stream.
*/
public InternalMatrixStats(StreamInput in) throws IOException {
+ super(in);
stats = in.readOptionalWriteable(RunningStats::new);
results = in.readOptionalWriteable(MatrixStatsResults::new);
}