diff options
author | Nik Everett <nik9000@gmail.com> | 2016-07-19 14:50:49 -0400 |
---|---|---|
committer | Nik Everett <nik9000@gmail.com> | 2016-07-20 09:46:04 -0400 |
commit | fc4b4396353fffc3b78b1e582d7aeef8ff98d95d (patch) | |
tree | 5e4a411d2ac115928551c043c662dfd9164da4df /core/src/main/java/org | |
parent | a4f09d2b814595d7cca111cbc59d72fcf2dfb57d (diff) |
Remove AggregationStreams and friends
* Remove outdated aggregation registration method
* Remove AggregationStreams
* Adds StreamInput#readNamedWriteableList and
StreamOutput#writeNamedWriteableList convenience methods. We strive to
make the reading and writing from the streams terse so they are easier
to scan visually.
* Remove PipelineAggregatorStreams
* Remove stream info from InternalAggreation.Type
* Remove InternalAggregation#type
* Remove Streamable from PipelineAggregator
* Remove Streamable from MultiBucketsAggregation.Bucket
Diffstat (limited to 'core/src/main/java/org')
22 files changed, 47 insertions, 425 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 0f6ab65db1..4c5f9757ca 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -836,6 +836,18 @@ public abstract class StreamInput extends InputStream { return builder; } + /** + * Reads a list of {@link NamedWriteable}s. + */ + public <T extends NamedWriteable> List<T> readNamedWriteableList(Class<T> categoryClass) throws IOException { + int count = readVInt(); + List<T> builder = new ArrayList<>(count); + for (int i=0; i<count; i++) { + builder.add(readNamedWriteable(categoryClass)); + } + return builder; + } + public static StreamInput wrap(byte[] bytes) { return wrap(bytes, 0, bytes.length); } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 3ee6e94a4a..24350936fa 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -835,4 +835,14 @@ public abstract class StreamOutput extends OutputStream { obj.writeTo(this); } } + + /** + * Writes a list of {@link NamedWriteable} objects. + */ + public void writeNamedWriteableList(List<? extends NamedWriteable> list) throws IOException { + writeVInt(list.size()); + for (NamedWriteable obj: list) { + writeNamedWriteable(obj); + } + } } diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index e8ddc5298a..04c77d9b1e 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -486,14 +486,6 @@ public class SearchModule extends AbstractModule { } } - public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader, - PipelineAggregator.Parser aggregationParser, ParseField aggregationName) { - // NORELEASE remove me in favor of the above method - pipelineAggregationParserRegistry.register(aggregationParser, aggregationName); - namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader); - } - - @Override protected void configure() { if (false == transportClient) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index f3ad47910e..5dc2937431 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -154,8 +154,8 @@ public class AggregationPhase implements SearchPhase { siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator); } else { throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name() - + "] of type [" + pipelineAggregator.type().name() - + "]. Only sibling pipeline aggregations are allowed at the top level"); + + "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are " + + "allowed at the top level"); } } context.queryResult().pipelineAggregators(siblingPipelineAggregators); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java deleted file mode 100644 index 81228be260..0000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationStreams.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; - -/** - * A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that - * know how to stream themselves. - */ -public class AggregationStreams { - private static Map<BytesReference, Stream> streams = emptyMap(); - - /** - * A stream that knows how to read an aggregation from the input. - */ - public interface Stream { - InternalAggregation readResult(StreamInput in) throws IOException; - } - - /** - * Registers the given stream and associate it with the given types. - * - * @param stream The streams to register - * @param types The types associated with the streams - */ - public static synchronized void registerStream(Stream stream, BytesReference... types) { - Map<BytesReference, Stream> newStreams = new HashMap<>(streams); - for (BytesReference type : types) { - newStreams.put(type, stream); - } - streams = unmodifiableMap(newStreams); - } - - /** - * Returns the stream that is registered for the given type - * - * @param type The given type - * @return The associated stream - */ - public static Stream stream(BytesReference type) { - return streams.get(type); - } - -} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 52a0ad1b17..b7635d3dc3 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -20,55 +20,35 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.support.AggregationPath; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; /** * An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations. */ -public abstract class InternalAggregation implements Aggregation, ToXContent, Streamable, NamedWriteable { - // NORELEASE remove Streamable - +public abstract class InternalAggregation implements Aggregation, ToXContent, NamedWriteable { /** * The aggregation type that holds all the string types that are associated with an aggregation: * <ul> * <li>name - used as the parser type</li> - * <li>stream - used as the stream type</li> * </ul> */ public static class Type { - - private String name; - private BytesReference stream; + private final String name; public Type(String name) { - this(name, new BytesArray(name)); - } - - public Type(String name, String stream) { - this(name, new BytesArray(stream)); - } - - public Type(String name, BytesReference stream) { this.name = name; - this.stream = stream; } /** @@ -79,14 +59,6 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St return name; } - /** - * @return The name of the stream type (used for registering the aggregation stream - * (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, BytesReference...)}). - */ - public BytesReference stream() { - return stream; - } - @Override public String toString() { return name; @@ -118,15 +90,11 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St } } + protected final String name; - protected String name; - - protected Map<String, Object> metaData; - - private List<PipelineAggregator> pipelineAggregators; + protected final Map<String, Object> metaData; - /** Constructs an un initialized addAggregation (used for serialization) **/ - protected InternalAggregation() {} // NORELEASE remove when removing Streamable + private final List<PipelineAggregator> pipelineAggregators; /** * Constructs an get with a given name. @@ -145,82 +113,19 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St protected InternalAggregation(StreamInput in) throws IOException { name = in.readString(); metaData = in.readMap(); - int size = in.readVInt(); - if (size == 0) { - pipelineAggregators = Collections.emptyList(); - } else { - pipelineAggregators = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - if (in.readBoolean()) { - pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class)); - } else { - BytesReference type = in.readBytesReference(); - PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); - pipelineAggregators.add(pipelineAggregator); - } - } - } - } - - @Override - public final void readFrom(StreamInput in) throws IOException { - try { - getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams - assert false : "Used reading constructor instead"; - } catch (UnsupportedOperationException e) { - // OK - } - name = in.readString(); - metaData = in.readMap(); - int size = in.readVInt(); - if (size == 0) { - pipelineAggregators = Collections.emptyList(); - } else { - pipelineAggregators = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - if (in.readBoolean()) { - pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class)); - } else { - BytesReference type = in.readBytesReference(); - PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); - pipelineAggregators.add(pipelineAggregator); - } - } - } - doReadFrom(in); - } - - protected void doReadFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable + pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class); } @Override public final void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeGenericValue(metaData); - out.writeVInt(pipelineAggregators.size()); - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - // NORELEASE temporary hack to support old style streams and new style NamedWriteable - try { - pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams. - out.writeBoolean(true); - out.writeNamedWriteable(pipelineAggregator); - } catch (UnsupportedOperationException e) { - out.writeBoolean(false); - out.writeBytesReference(pipelineAggregator.type().stream()); - pipelineAggregator.writeTo(out); - } - } + out.writeNamedWriteableList(pipelineAggregators); doWriteTo(out); } protected abstract void doWriteTo(StreamOutput out) throws IOException; - @Override - public String getWriteableName() { - // NORELEASE remove me when all InternalAggregations override it - throw new UnsupportedOperationException("Override on every class"); - } @Override public String getName() { @@ -228,14 +133,6 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St } /** - * @return The {@link Type} of this aggregation - */ - public Type type() { - // NORELEASE remove this method - throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead"); - } - - /** * Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given * aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing, * try reusing an existing instance (typically the first in the given list) to save on redundant object diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 3628133580..66e45156ca 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.search.aggregations; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -198,41 +197,15 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl @Override public void readFrom(StreamInput in) throws IOException { - int size = in.readVInt(); - if (size == 0) { - aggregations = Collections.emptyList(); + aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)); + if (aggregations.isEmpty()) { aggregationsAsMap = emptyMap(); - } else { - aggregations = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time - if (in.readBoolean()) { - aggregations.add(in.readNamedWriteable(InternalAggregation.class)); - } else { - BytesReference type = in.readBytesReference(); - InternalAggregation aggregation = AggregationStreams.stream(type).readResult(in); - aggregations.add(aggregation); - } - } } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(aggregations.size()); - for (Aggregation aggregation : aggregations) { - InternalAggregation internal = (InternalAggregation) aggregation; - // NORELEASE Temporary hack to support old style streams and new style NamedWriteable at the same time - try { - internal.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams. - out.writeBoolean(true); - out.writeNamedWriteable(internal); - } catch (UnsupportedOperationException e) { - out.writeBoolean(false); - out.writeBytesReference(internal.type().stream()); - internal.writeTo(out); - } - } + out.writeNamedWriteableList(aggregations); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index 46867d24f4..f1e8b7358c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -29,10 +29,6 @@ import java.util.Map; public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation, B extends InternalMultiBucketAggregation.InternalBucket> extends InternalAggregation implements MultiBucketsAggregation { - - public InternalMultiBucketAggregation() { - } - public InternalMultiBucketAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, pipelineAggregators, metaData); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 9b88ab4705..e8b0468006 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -39,8 +39,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio private long docCount; private InternalAggregations aggregations; - protected InternalSingleBucketAggregation() {} // for serialization - /** * Creates a single bucket aggregation. * @@ -64,12 +62,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio } @Override - protected void doReadFrom(StreamInput in) throws IOException { - docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); - } - - @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeVLong(docCount); aggregations.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java index 39603c9ab4..2d8d26dd35 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java @@ -40,9 +40,7 @@ public interface MultiBucketsAggregation extends Aggregation { * A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified * by a key, and can potentially hold sub-aggregations computed over all documents in it. */ - public interface Bucket extends HasAggregations, ToXContent, Streamable, Writeable { - // NORELEASE remove Streamable - + public interface Bucket extends HasAggregations, ToXContent, Writeable { /** * @return The key associated with the bucket */ @@ -66,12 +64,6 @@ public interface MultiBucketsAggregation extends Aggregation { Object getProperty(String containingAggName, List<String> path); - @Override - default void readFrom(StreamInput in) throws IOException { - // NORELEASE remove me when no Buckets override it - throw new UnsupportedOperationException("Prefer the Writeable interface"); - } - static class SubAggregationComparator<B extends Bucket> implements java.util.Comparator<B> { private final AggregationPath path; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index bb0acd76be..0dfc9c8e53 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -35,7 +35,7 @@ import java.io.IOException; public class InternalDateHistogram { public static final Factory HISTOGRAM_FACTORY = new Factory(); - static final Type TYPE = new Type("date_histogram", "dhisto"); + static final Type TYPE = new Type("date_histogram"); static class Bucket extends InternalHistogram.Bucket { Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, DocValueFormat formatter, diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 8891f75d4b..29ae9a1016 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -303,11 +303,6 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter } @Override - public Type type() { - return TYPE; - } - - @Override public List<B> getBuckets() { return buckets; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index e7e7533088..54ee27bfa9 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -173,13 +173,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan } @Override - public void readFrom(StreamInput in) throws IOException { - - } - - @Override public void writeTo(StreamOutput out) throws IOException { - } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index c4ade93e60..00a066c11b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -242,11 +242,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override protected void writeTermTo(StreamOutput out) throws IOException { throw new UnsupportedOperationException(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 56d69d814f..a8b4c44ce4 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -196,13 +196,9 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int !terms.getClass().equals(UnmappedTerms.class)) { // control gets into this loop when the same field name against which the query is executed // is of different types in different indices. - throw new AggregationExecutionException("Merging/Reducing the aggregations failed " + - "when computing the aggregation [ Name: " + - referenceTerms.getName() + ", Type: " + - referenceTerms.type() + " ]" + " because: " + - "the field you gave in the aggregation query " + - "existed as two different types " + - "in two different indices"); + throw new AggregationExecutionException("Merging/Reducing the aggregations failed when computing the aggregation [" + + referenceTerms.getName() + "] because the field you gave in the aggregation query existed as two different " + + "types in two different indices"); } otherDocCount += terms.getSumOfOtherDocCounts(); final long thisAggDocCountError; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java index a5c64c5cde..ded69d9f75 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java @@ -28,9 +28,6 @@ import java.util.List; import java.util.Map; public abstract class InternalMetricsAggregation extends InternalAggregation { - - protected InternalMetricsAggregation() {} // NORELEASE remove when we remove streamable - protected InternalMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, pipelineAggregators, metaData); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java index 3677150097..901c52a232 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java @@ -36,9 +36,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA protected DocValueFormat format = DEFAULT_FORMAT; public abstract static class SingleValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.SingleValue { - - protected SingleValue() {} // NORELEASE remove when we remove Streamable - protected SingleValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, pipelineAggregators, metaData); } @@ -69,9 +66,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA } public abstract static class MultiValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.MultiValue { - - protected MultiValue() {} - protected MultiValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, pipelineAggregators, metaData); } @@ -101,8 +95,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA } } - private InternalNumericMetricsAggregation() {} // NORELEASE remove when we remove Streamable - private InternalNumericMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { super(name, pipelineAggregators, metaData); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java index 7bfd54eb64..02953abc2d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index 446b56d07e..f49ca1dd39 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -24,19 +24,15 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import java.io.IOException; import java.util.Map; -public abstract class PipelineAggregator implements Streamable, NamedWriteable { - // NORELEASE remove Streamable - +public abstract class PipelineAggregator implements NamedWriteable { /** * Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}. */ @@ -66,9 +62,6 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable { private String[] bucketsPaths; private Map<String, Object> metaData; - protected PipelineAggregator() { // for Serialisation - } - protected PipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) { this.name = name; this.bucketsPaths = bucketsPaths; @@ -85,26 +78,8 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable { } @Override - public final void readFrom(StreamInput in) throws IOException { - try { - getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams - assert false : "Used reading constructor instead"; - } catch (UnsupportedOperationException e) { - // OK - } - name = in.readString(); - bucketsPaths = in.readStringArray(); - metaData = in.readMap(); - doReadFrom(in); - } - - protected void doReadFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable - } - - @Override public final void writeTo(StreamOutput out) throws IOException { - out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable + out.writeString(name); out.writeStringArray(bucketsPaths); out.writeMap(metaData); doWriteTo(out); @@ -112,12 +87,6 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable { protected abstract void doWriteTo(StreamOutput out) throws IOException; - @Override - public String getWriteableName() { - // NORELEASE remove me when all InternalAggregations override it - throw new UnsupportedOperationException("Override on every class"); - } - public String name() { return name; } @@ -130,10 +99,5 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable { return metaData; } - public Type type() { - // NORELEASE remove this method - throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead"); - } - public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java deleted file mode 100644 index ad1dc73f6e..0000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorStreams.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.pipeline; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; - -/** - * A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that - * know how to stream themselves. - */ -public class PipelineAggregatorStreams { - private static Map<BytesReference, Stream> streams = emptyMap(); - - /** - * A stream that knows how to read an aggregation from the input. - */ - public interface Stream { - PipelineAggregator readResult(StreamInput in) throws IOException; - } - - /** - * Registers the given stream and associate it with the given types. - * - * @param stream The streams to register - * @param types The types associated with the streams - */ - public static synchronized void registerStream(Stream stream, BytesReference... types) { - Map<BytesReference, Stream> newStreams = new HashMap<>(streams); - for (BytesReference type : types) { - newStreams.put(type, stream); - } - streams = unmodifiableMap(newStreams); - } - - /** - * Returns the stream that is registered for the given type - * - * @param type The given type - * @return The associated stream - */ - public static Stream stream(BytesReference type) { - return streams.get(type); - } - -} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index 79ec761b0e..b78691455d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -36,11 +36,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; public abstract class SiblingPipelineAggregator extends PipelineAggregator { - - protected SiblingPipelineAggregator() { // NOCOMMIT remove me - super(); - } - protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) { super(name, bucketsPaths, metaData); } @@ -83,7 +78,7 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator { return singleBucketAgg.create(new InternalAggregations(aggs)); } else { throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation [" - + aggregation.type().name() + "]"); + + aggregation.getWriteableName() + "]"); } } diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 772f91a525..5125950a41 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -23,7 +23,6 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; @@ -31,15 +30,15 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.suggest.Suggest; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; @@ -220,21 +219,8 @@ public class QuerySearchResult extends QuerySearchResultProvider { if (in.readBoolean()) { aggregations = InternalAggregations.readAggregations(in); } - if (in.readBoolean()) { - int size = in.readVInt(); - List<SiblingPipelineAggregator> pipelineAggregators = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time - if (in.readBoolean()) { - pipelineAggregators.add((SiblingPipelineAggregator) in.readNamedWriteable(PipelineAggregator.class)); - } else { - BytesReference type = in.readBytesReference(); - PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in); - pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator); - } - } - this.pipelineAggregators = pipelineAggregators; - } + pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a) + .collect(Collectors.toList()); if (in.readBoolean()) { suggest = Suggest.readSuggest(in); } @@ -272,24 +258,7 @@ public class QuerySearchResult extends QuerySearchResultProvider { out.writeBoolean(true); aggregations.writeTo(out); } - if (pipelineAggregators == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeVInt(pipelineAggregators.size()); - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - // NORELEASE temporary hack to support old style streams and new style NamedWriteable - try { - pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams. - out.writeBoolean(true); - out.writeNamedWriteable(pipelineAggregator); - } catch (UnsupportedOperationException e) { - out.writeBoolean(false); - out.writeBytesReference(pipelineAggregator.type().stream()); - pipelineAggregator.writeTo(out); - } - } - } + out.writeNamedWriteableList(pipelineAggregators == null ? emptyList() : pipelineAggregators); if (suggest == null) { out.writeBoolean(false); } else { |