diff options
author | Adrien Grand <jpountz@gmail.com> | 2016-04-18 16:11:11 +0200 |
---|---|---|
committer | Adrien Grand <jpountz@gmail.com> | 2016-05-13 17:22:01 +0200 |
commit | 638da06c1d5121622cf0d47c2c55b4a84a794909 (patch) | |
tree | 91aba8741b9b044ce03c4fcf31501646c6f88716 /core/src/main/java/org/elasticsearch/search/aggregations/bucket/range | |
parent | 520697eb14d90df69f5e27f6b06934ddfdca62a2 (diff) |
Add back support for `ip` range aggregations. #17859
This commit adds support for range aggregations on `ip` fields. However it will
only work on 5.x indices.
Closes #17700
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search/aggregations/bucket/range')
9 files changed, 1071 insertions, 549 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java new file mode 100644 index 0000000000..87a3e917dd --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java @@ -0,0 +1,234 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.range; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; + +/** A range aggregator for values that are stored in SORTED_SET doc values. */ +public final class BinaryRangeAggregator extends BucketsAggregator { + + public static class Range { + + final String key; + final BytesRef from, to; + + public Range(String key, BytesRef from, BytesRef to) { + this.key = key; + this.from = from; + this.to = to; + } + } + + static final Comparator<Range> RANGE_COMPARATOR = (a, b) -> { + int cmp = compare(a.from, b.from, 1); + if (cmp == 0) { + cmp = compare(a.to, b.to, -1); + } + return cmp; + }; + + private static int compare(BytesRef a, BytesRef b, int m) { + return a == null + ? b == null ? 0 : -m + : b == null ? m : a.compareTo(b); + } + + final ValuesSource.Bytes valuesSource; + final DocValueFormat format; + final boolean keyed; + final Range[] ranges; + + public BinaryRangeAggregator(String name, AggregatorFactories factories, + ValuesSource.Bytes valuesSource, DocValueFormat format, + List<Range> ranges, boolean keyed, AggregationContext aggregationContext, + Aggregator parent, List<PipelineAggregator> pipelineAggregators, + Map<String, Object> metaData) throws IOException { + super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); + this.valuesSource = valuesSource; + this.format = format; + this.keyed = keyed; + this.ranges = ranges.toArray(new Range[0]); + Arrays.sort(this.ranges, RANGE_COMPARATOR); + + } + + @Override + public boolean needsScores() { + return (valuesSource != null && valuesSource.needsScores()) || super.needsScores(); + } + + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals) { + SortedSetDocValues values = ((ValuesSource.Bytes.WithOrdinals) valuesSource).ordinalsValues(ctx); + return new SortedSetRangeLeafCollector(values, ranges, sub) { + @Override + protected void doCollect(LeafBucketCollector sub, int doc, long bucket) throws IOException { + collectBucket(sub, doc, bucket); + } + }; + } + throw new IllegalArgumentException("binary range aggregation expects a values source that supports ordinals"); + } + + static abstract class SortedSetRangeLeafCollector extends LeafBucketCollectorBase { + + final long[] froms, tos, maxTos; + final SortedSetDocValues values; + final LeafBucketCollector sub; + + SortedSetRangeLeafCollector(SortedSetDocValues values, + Range[] ranges, LeafBucketCollector sub) { + super(sub, values); + for (int i = 1; i < ranges.length; ++i) { + if (RANGE_COMPARATOR.compare(ranges[i-1], ranges[i]) > 0) { + throw new IllegalArgumentException("Ranges must be sorted"); + } + } + this.values = values; + this.sub = sub; + froms = new long[ranges.length]; + tos = new long[ranges.length]; // inclusive + maxTos = new long[ranges.length]; + for (int i = 0; i < ranges.length; ++i) { + if (ranges[i].from == null) { + froms[i] = 0; + } else { + froms[i] = values.lookupTerm(ranges[i].from); + if (froms[i] < 0) { + froms[i] = -1 - froms[i]; + } + } + if (ranges[i].to == null) { + tos[i] = values.getValueCount() - 1; + } else { + long ord = values.lookupTerm(ranges[i].to); + if (ord < 0) { + tos[i] = -2 - ord; + } else { + tos[i] = ord - 1; + } + } + } + maxTos[0] = tos[0]; + for (int i = 1; i < tos.length; ++i) { + maxTos[i] = Math.max(maxTos[i-1], tos[i]); + } + } + + @Override + public void collect(int doc, long bucket) throws IOException { + values.setDocument(doc); + int lo = 0; + for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) { + lo = collect(doc, ord, bucket, lo); + } + } + + private int collect(int doc, long ord, long bucket, int lowBound) throws IOException { + int lo = lowBound, hi = froms.length - 1; // all candidates are between these indexes + int mid = (lo + hi) >>> 1; + while (lo <= hi) { + if (ord < froms[mid]) { + hi = mid - 1; + } else if (ord > maxTos[mid]) { + lo = mid + 1; + } else { + break; + } + mid = (lo + hi) >>> 1; + } + if (lo > hi) return lo; // no potential candidate + + // binary search the lower bound + int startLo = lo, startHi = mid; + while (startLo <= startHi) { + final int startMid = (startLo + startHi) >>> 1; + if (ord > maxTos[startMid]) { + startLo = startMid + 1; + } else { + startHi = startMid - 1; + } + } + + // binary search the upper bound + int endLo = mid, endHi = hi; + while (endLo <= endHi) { + final int endMid = (endLo + endHi) >>> 1; + if (ord < froms[endMid]) { + endHi = endMid - 1; + } else { + endLo = endMid + 1; + } + } + + assert startLo == lowBound || ord > maxTos[startLo - 1]; + assert endHi == froms.length - 1 || ord < froms[endHi + 1]; + + for (int i = startLo; i <= endHi; ++i) { + if (ord <= tos[i]) { + doCollect(sub, doc, bucket * froms.length + i); + } + } + + return endHi + 1; + } + + protected abstract void doCollect(LeafBucketCollector sub, int doc, long bucket) throws IOException; + } + + @Override + public InternalAggregation buildAggregation(long bucket) throws IOException { + InternalBinaryRange.Bucket[] buckets = new InternalBinaryRange.Bucket[ranges.length]; + for (int i = 0; i < buckets.length; ++i) { + long bucketOrd = bucket * ranges.length + i; + buckets[i] = new InternalBinaryRange.Bucket(format, keyed, + ranges[i].key, ranges[i].from, ranges[i].to, + bucketDocCount(bucketOrd), bucketAggregations(bucketOrd)); + } + return new InternalBinaryRange(name, format, keyed, buckets, pipelineAggregators(), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalBinaryRange(name, format, keyed, new InternalBinaryRange.Bucket[0], pipelineAggregators(), metaData()); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java new file mode 100644 index 0000000000..fda822cf11 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.range; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +public class BinaryRangeAggregatorFactory + extends ValuesSourceAggregatorFactory<ValuesSource.Bytes, BinaryRangeAggregatorFactory> { + + private final List<BinaryRangeAggregator.Range> ranges; + private final boolean keyed; + + public BinaryRangeAggregatorFactory(String name, Type type, + ValuesSourceConfig<ValuesSource.Bytes> config, + List<BinaryRangeAggregator.Range> ranges, boolean keyed, + AggregationContext context, + AggregatorFactory<?> parent, Builder subFactoriesBuilder, + Map<String, Object> metaData) throws IOException { + super(name, type, config, context, parent, subFactoriesBuilder, metaData); + this.ranges = ranges; + this.keyed = keyed; + } + + @Override + protected Aggregator createUnmapped(Aggregator parent, + List<PipelineAggregator> pipelineAggregators, + Map<String, Object> metaData) throws IOException { + return new BinaryRangeAggregator(name, factories, null, config.format(), + ranges, keyed, context, parent, pipelineAggregators, metaData); + } + + @Override + protected Aggregator doCreateInternal(ValuesSource.Bytes valuesSource, + Aggregator parent, + boolean collectsFromSingleBucket, + List<PipelineAggregator> pipelineAggregators, + Map<String, Object> metaData) throws IOException { + return new BinaryRangeAggregator(name, factories, valuesSource, config.format(), + ranges, keyed, context, parent, pipelineAggregators, metaData); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java new file mode 100644 index 0000000000..f3f1763113 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java @@ -0,0 +1,311 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.range; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +/** A range aggregation for data that is encoded in doc values using a binary representation. */ +public final class InternalBinaryRange + extends InternalMultiBucketAggregation<InternalBinaryRange, InternalBinaryRange.Bucket> + implements Range { + + public static final Type TYPE = new Type("binary_range"); + + private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalBinaryRange readResult(StreamInput in) throws IOException { + InternalBinaryRange range = new InternalBinaryRange(); + range.readFrom(in); + return range; + } + }; + + private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket bucket = new Bucket(context.format(), context.keyed()); + bucket.readFrom(in); + return bucket; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.format(bucket.format); + context.keyed(bucket.keyed); + return context; + } + }; + + public static void registerStream() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); + } + + public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Range.Bucket { + + private final transient DocValueFormat format; + private final transient boolean keyed; + private String key; + private BytesRef from, to; + private long docCount; + private InternalAggregations aggregations; + + public Bucket(DocValueFormat format, boolean keyed, String key, BytesRef from, BytesRef to, + long docCount, InternalAggregations aggregations) { + this(format, keyed); + this.key = key; + this.from = from; + this.to = to; + this.docCount = docCount; + this.aggregations = aggregations; + } + + // for serialization + private Bucket(DocValueFormat format, boolean keyed) { + this.format = format; + this.keyed = keyed; + } + + @Override + public Object getKey() { + return key; + } + + @Override + public String getKeyAsString() { + return key; + } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String key = this.key; + if (keyed) { + if (key == null) { + StringBuilder keyBuilder = new StringBuilder(); + keyBuilder.append(from == null ? "*" : format.format(from)); + keyBuilder.append("-"); + keyBuilder.append(to == null ? "*" : format.format(to)); + key = keyBuilder.toString(); + } + builder.startObject(key); + } else { + builder.startObject(); + if (key != null) { + builder.field(CommonFields.KEY, key); + } + } + if (from != null) { + builder.field(CommonFields.FROM, getFrom()); + } + if (to != null) { + builder.field(CommonFields.TO, getTo()); + } + builder.field(CommonFields.DOC_COUNT, docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + key = in.readOptionalString(); + if (in.readBoolean()) { + from = in.readBytesRef(); + } else { + from = null; + } + if (in.readBoolean()) { + to = in.readBytesRef(); + } else { + to = null; + } + docCount = in.readLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(key); + out.writeBoolean(from != null); + if (from != null) { + out.writeBytesRef(from); + } + out.writeBoolean(to != null); + if (to != null) { + out.writeBytesRef(to); + } + out.writeLong(docCount); + aggregations.writeTo(out); + } + + @Override + public Object getFrom() { + return getFromAsString(); + } + + @Override + public String getFromAsString() { + return from == null ? null : format.format(from); + } + + @Override + public Object getTo() { + return getToAsString(); + } + + @Override + public String getToAsString() { + return to == null ? null : format.format(to); + } + + } + + private DocValueFormat format; + private boolean keyed; + private Bucket[] buckets; + + public InternalBinaryRange(String name, DocValueFormat format, boolean keyed, Bucket[] buckets, + List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { + super(name, pipelineAggregators, metaData); + this.format = format; + this.keyed = keyed; + this.buckets = buckets; + } + + private InternalBinaryRange() {} // for serialization + + @Override + public List<Range.Bucket> getBuckets() { + return Arrays.asList(buckets); + } + + @Override + public InternalBinaryRange create(List<Bucket> buckets) { + return new InternalBinaryRange(name, format, keyed, buckets.toArray(new Bucket[0]), + pipelineAggregators(), metaData); + } + + @Override + public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { + return new Bucket(format, keyed, prototype.key, prototype.from, prototype.to, prototype.docCount, aggregations); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { + long[] docCounts = new long[buckets.length]; + InternalAggregations[][] aggs = new InternalAggregations[buckets.length][]; + for (int i = 0; i < aggs.length; ++i) { + aggs[i] = new InternalAggregations[aggregations.size()]; + } + for (int i = 0; i < aggregations.size(); ++i) { + InternalBinaryRange range = (InternalBinaryRange) aggregations.get(i); + if (range.buckets.length != buckets.length) { + throw new IllegalStateException("Expected " + buckets.length + " buckets, but got " + range.buckets.length); + } + for (int j = 0; j < buckets.length; ++j) { + Bucket bucket = range.buckets[j]; + docCounts[j] += bucket.docCount; + aggs[j][i] = bucket.aggregations; + } + } + Bucket[] buckets = new Bucket[this.buckets.length]; + for (int i = 0; i < buckets.length; ++i) { + Bucket b = this.buckets[i]; + buckets[i] = new Bucket(format, keyed, b.key, b.from, b.to, docCounts[i], + InternalAggregations.reduce(Arrays.asList(aggs[i]), reduceContext)); + } + return new InternalBinaryRange(name, format, keyed, buckets, pipelineAggregators(), metaData); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, + Params params) throws IOException { + if (keyed) { + builder.startObject(CommonFields.BUCKETS); + } else { + builder.startArray(CommonFields.BUCKETS); + } + for (Bucket range : buckets) { + range.toXContent(builder, params); + } + if (keyed) { + builder.endObject(); + } else { + builder.endArray(); + } + return builder; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeBoolean(keyed); + out.writeVInt(buckets.length); + for (Bucket bucket : buckets) { + bucket.writeTo(out); + } + } + + @Override + protected void doReadFrom(StreamInput in) throws IOException { + format = in.readNamedWriteable(DocValueFormat.class); + keyed = in.readBoolean(); + Bucket[] buckets = new Bucket[in.readVInt()]; + for (int i = 0; i < buckets.length; ++i) { + Bucket bucket = new Bucket(format, keyed); + bucket.readFrom(in); + buckets[i] = bucket; + } + this.buckets = buckets; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ip/IpRangeAggregatorBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ip/IpRangeAggregatorBuilder.java new file mode 100644 index 0000000000..c56a2952f8 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ip/IpRangeAggregatorBuilder.java @@ -0,0 +1,330 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.range.ip; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import org.apache.lucene.document.InetAddressPoint; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.range.BinaryRangeAggregator; +import org.elasticsearch.search.aggregations.bucket.range.BinaryRangeAggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; + + +public final class IpRangeAggregatorBuilder + extends ValuesSourceAggregatorBuilder<ValuesSource.Bytes, IpRangeAggregatorBuilder> { + private static final String NAME = "ip_range"; + public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); + private static final InternalAggregation.Type TYPE = new InternalAggregation.Type(NAME); + + public static class Range implements ToXContent { + private final String key; + private final String from; + private final String to; + + Range(String key, String from, String to) { + if (from != null) { + InetAddresses.forString(from); + } + if (to != null) { + InetAddresses.forString(to); + } + this.key = key; + this.from = from; + this.to = to; + } + + Range(String key, String mask) { + String[] splits = mask.split("/"); + if (splits.length != 2) { + throw new IllegalArgumentException("Expected [ip/prefix_length] but got [" + mask + + "], which contains zero or more than one [/]"); + } + InetAddress value = InetAddresses.forString(splits[0]); + int prefixLength = Integer.parseInt(splits[1]); + // copied from InetAddressPoint.newPrefixQuery + if (prefixLength < 0 || prefixLength > 8 * value.getAddress().length) { + throw new IllegalArgumentException("illegal prefixLength [" + prefixLength + + "] in [" + mask + "]. Must be 0-32 for IPv4 ranges, 0-128 for IPv6 ranges"); + } + // create the lower value by zeroing out the host portion, upper value by filling it with all ones. + byte lower[] = value.getAddress(); + byte upper[] = value.getAddress(); + for (int i = prefixLength; i < 8 * lower.length; i++) { + int m = 1 << (7 - (i & 7)); + lower[i >> 3] &= ~m; + upper[i >> 3] |= m; + } + this.key = key; + try { + this.from = InetAddresses.toAddrString(InetAddress.getByAddress(lower)); + this.to = InetAddresses.toAddrString(InetAddress.getByAddress(upper)); + } catch (UnknownHostException bogus) { + throw new AssertionError(bogus); + } + } + + private Range(StreamInput in) throws IOException { + this.key = in.readOptionalString(); + this.from = in.readOptionalString(); + this.to = in.readOptionalString(); + } + + void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(key); + out.writeOptionalString(from); + out.writeOptionalString(to); + } + + public String getKey() { + return key; + } + + public String getFrom() { + return from; + } + + public String getTo() { + return to; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Range that = (Range) obj; + return Objects.equals(key, that.key) + && Objects.equals(from, that.from) + && Objects.equals(to, that.to); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), key, from, to); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (key != null) { + builder.field(RangeAggregator.Range.KEY_FIELD.getPreferredName(), key); + } + if (from != null) { + builder.field(RangeAggregator.Range.FROM_FIELD.getPreferredName(), from); + } + if (to != null) { + builder.field(RangeAggregator.Range.TO_FIELD.getPreferredName(), to); + } + builder.endObject(); + return builder; + } + } + + private boolean keyed = false; + private List<Range> ranges = new ArrayList<>(); + + public IpRangeAggregatorBuilder(String name) { + super(name, TYPE, ValuesSourceType.BYTES, ValueType.IP); + } + + @Override + public String getWriteableName() { + return NAME; + } + + public IpRangeAggregatorBuilder keyed(boolean keyed) { + this.keyed = keyed; + return this; + } + + public boolean keyed() { + return keyed; + } + + /** Get the current list or ranges that are configured on this aggregation. */ + public List<Range> getRanges() { + return Collections.unmodifiableList(ranges); + } + + /** Add a new {@link Range} to this aggregation. */ + public IpRangeAggregatorBuilder addRange(Range range) { + ranges.add(range); + return this; + } + + /** + * Add a new range to this aggregation. + * + * @param key + * the key to use for this range in the response + * @param from + * the lower bound on the distances, inclusive + * @param to + * the upper bound on the distances, exclusive + */ + public IpRangeAggregatorBuilder addRange(String key, String from, String to) { + addRange(new Range(key, from, to)); + return this; + } + + /** + * Add a new range to this aggregation using the CIDR notation. + */ + public IpRangeAggregatorBuilder addMaskRange(String key, String mask) { + return addRange(new Range(key, mask)); + } + + /** + * Same as {@link #addMaskRange(String, String)} but uses the mask itself as + * a key. + */ + public IpRangeAggregatorBuilder addMaskRange(String mask) { + return addRange(new Range(mask, mask)); + } + + /** + * Same as {@link #addRange(String, String, String)} but the key will be + * automatically generated. + */ + public IpRangeAggregatorBuilder addRange(String from, String to) { + return addRange(null, from, to); + } + + /** + * Same as {@link #addRange(String, String, String)} but there will be no + * lower bound. + */ + public IpRangeAggregatorBuilder addUnboundedTo(String key, String to) { + addRange(new Range(key, null, to)); + return this; + } + + /** + * Same as {@link #addUnboundedTo(String, String)} but the key will be + * generated automatically. + */ + public IpRangeAggregatorBuilder addUnboundedTo(String to) { + return addUnboundedTo(null, to); + } + + /** + * Same as {@link #addRange(String, String, String)} but there will be no + * upper bound. + */ + public IpRangeAggregatorBuilder addUnboundedFrom(String key, String from) { + addRange(new Range(key, from, null)); + return this; + } + + @Override + public IpRangeAggregatorBuilder script(Script script) { + throw new IllegalArgumentException("[ip_range] does not support scripts"); + } + + /** + * Same as {@link #addUnboundedFrom(String, String)} but the key will be + * generated automatically. + */ + public IpRangeAggregatorBuilder addUnboundedFrom(String from) { + return addUnboundedFrom(null, from); + } + + public IpRangeAggregatorBuilder(StreamInput in) throws IOException { + super(in, TYPE, ValuesSourceType.BYTES, ValueType.IP); + final int numRanges = in.readVInt(); + for (int i = 0; i < numRanges; ++i) { + addRange(new Range(in)); + } + keyed = in.readBoolean(); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeVInt(ranges.size()); + for (Range range : ranges) { + range.writeTo(out); + } + out.writeBoolean(keyed); + } + + private static BytesRef toBytesRef(String ip) { + if (ip == null) { + return null; + } + InetAddress address = InetAddresses.forString(ip); + byte[] bytes = InetAddressPoint.encode(address); + return new BytesRef(bytes); + } + + @Override + protected ValuesSourceAggregatorFactory<ValuesSource.Bytes, ?> innerBuild( + AggregationContext context, ValuesSourceConfig<ValuesSource.Bytes> config, + AggregatorFactory<?> parent, Builder subFactoriesBuilder) + throws IOException { + List<BinaryRangeAggregator.Range> ranges = new ArrayList<>(); + for (Range range : this.ranges) { + ranges.add(new BinaryRangeAggregator.Range(range.key, toBytesRef(range.from), toBytesRef(range.to))); + } + return new BinaryRangeAggregatorFactory(name, TYPE, config, ranges, + keyed, context, parent, subFactoriesBuilder, metaData); + } + + @Override + protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(RangeAggregator.RANGES_FIELD.getPreferredName(), ranges); + builder.field(RangeAggregator.KEYED_FIELD.getPreferredName(), keyed); + return builder; + } + + @Override + protected int innerHashCode() { + return Objects.hash(keyed, ranges); + } + + @Override + protected boolean innerEquals(Object obj) { + IpRangeAggregatorBuilder that = (IpRangeAggregatorBuilder) obj; + return keyed == that.keyed + && ranges.equals(that.ranges); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ip/IpRangeParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ip/IpRangeParser.java new file mode 100644 index 0000000000..64ed77d42f --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ip/IpRangeParser.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.range.ip; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.search.aggregations.support.AbstractValuesSourceParser.BytesValuesSourceParser; +import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator; +import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregatorBuilder.Range; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; + +/** + * A parser for ip range aggregations. + */ +public class IpRangeParser extends BytesValuesSourceParser { + + private static final ParseField MASK_FIELD = new ParseField("mask"); + + public IpRangeParser() { + super(false, false); + } + + @Override + protected ValuesSourceAggregatorBuilder<ValuesSource.Bytes, ?> createFactory( + String aggregationName, ValuesSourceType valuesSourceType, + ValueType targetValueType, Map<ParseField, Object> otherOptions) { + IpRangeAggregatorBuilder range = new IpRangeAggregatorBuilder(aggregationName); + @SuppressWarnings("unchecked") + Iterable<Range> ranges = (Iterable<Range>) otherOptions.get(RangeAggregator.RANGES_FIELD); + if (otherOptions.containsKey(RangeAggregator.RANGES_FIELD)) { + for (Range r : ranges) { + range.addRange(r); + } + } + if (otherOptions.containsKey(RangeAggregator.KEYED_FIELD)) { + range.keyed((Boolean) otherOptions.get(RangeAggregator.KEYED_FIELD)); + } + return range; + } + + private Range parseRange(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + String key = null; + String from = null; + String to = null; + String mask = null; + + if (parser.currentToken() != Token.START_OBJECT) { + throw new ParsingException(parser.getTokenLocation(), "[ranges] must contain objects, but hit a " + parser.currentToken()); + } + while (parser.nextToken() != Token.END_OBJECT) { + if (parser.currentToken() == Token.FIELD_NAME) { + continue; + } + if (parseFieldMatcher.match(parser.currentName(), RangeAggregator.Range.KEY_FIELD)) { + key = parser.text(); + } else if (parseFieldMatcher.match(parser.currentName(), RangeAggregator.Range.FROM_FIELD)) { + from = parser.text(); + } else if (parseFieldMatcher.match(parser.currentName(), RangeAggregator.Range.TO_FIELD)) { + to = parser.text(); + } else if (parseFieldMatcher.match(parser.currentName(), MASK_FIELD)) { + mask = parser.text(); + } else { + throw new ParsingException(parser.getTokenLocation(), "Unexpected ip range parameter: [" + parser.currentName() + "]"); + } + } + if (mask != null) { + if (key == null) { + key = mask; + } + return new Range(key, mask); + } else { + return new Range(key, from, to); + } + } + + @Override + protected boolean token(String aggregationName, String currentFieldName, + Token token, XContentParser parser, + ParseFieldMatcher parseFieldMatcher, + Map<ParseField, Object> otherOptions) throws IOException { + if (parseFieldMatcher.match(currentFieldName, RangeAggregator.RANGES_FIELD)) { + if (parser.currentToken() != Token.START_ARRAY) { + throw new ParsingException(parser.getTokenLocation(), "[ranges] must be passed as an array, but got a " + token); + } + List<Range> ranges = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + Range range = parseRange(parser, parseFieldMatcher); + ranges.add(range); + } + otherOptions.put(RangeAggregator.RANGES_FIELD, ranges); + return true; + } else if (parseFieldMatcher.match(parser.currentName(), RangeAggregator.KEYED_FIELD)) { + otherOptions.put(RangeAggregator.KEYED_FIELD, parser.booleanValue()); + return true; + } + return false; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/IPv4RangeAggregatorBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/IPv4RangeAggregatorBuilder.java deleted file mode 100644 index 6bafa6d566..0000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/IPv4RangeAggregatorBuilder.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.aggregations.bucket.range.ipv4; - -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.network.Cidrs; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.aggregations.AggregatorFactory; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; -import org.elasticsearch.search.aggregations.bucket.range.AbstractRangeBuilder; -import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator; -import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; -import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Objects; - -public class IPv4RangeAggregatorBuilder extends AbstractRangeBuilder<IPv4RangeAggregatorBuilder, IPv4RangeAggregatorBuilder.Range> { - public static final String NAME = InternalIPv4Range.TYPE.name(); - public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); - - public IPv4RangeAggregatorBuilder(String name) { - super(name, InternalIPv4Range.FACTORY); - } - - /** - * Read from a stream. - */ - public IPv4RangeAggregatorBuilder(StreamInput in) throws IOException { - super(in, InternalIPv4Range.FACTORY, Range::new); - } - - @Override - public String getWriteableName() { - return NAME; - } - - /** - * Add a new range to this aggregation. - * - * @param key - * the key to use for this range in the response - * @param from - * the lower bound on the distances, inclusive - * @param to - * the upper bound on the distances, exclusive - */ - public IPv4RangeAggregatorBuilder addRange(String key, String from, String to) { - addRange(new Range(key, from, to)); - return this; - } - - /** - * Same as {@link #addMaskRange(String, String)} but uses the mask itself as - * a key. - */ - public IPv4RangeAggregatorBuilder addMaskRange(String key, String mask) { - return addRange(new Range(key, mask)); - } - - /** - * Same as {@link #addMaskRange(String, String)} but uses the mask itself as - * a key. - */ - public IPv4RangeAggregatorBuilder addMaskRange(String mask) { - return addRange(new Range(mask, mask)); - } - - /** - * Same as {@link #addRange(String, String, String)} but the key will be - * automatically generated. - */ - public IPv4RangeAggregatorBuilder addRange(String from, String to) { - return addRange(null, from, to); - } - - /** - * Same as {@link #addRange(String, String, String)} but there will be no - * lower bound. - */ - public IPv4RangeAggregatorBuilder addUnboundedTo(String key, String to) { - addRange(new Range(key, null, to)); - return this; - } - - /** - * Same as {@link #addUnboundedTo(String, String)} but the key will be - * generated automatically. - */ - public IPv4RangeAggregatorBuilder addUnboundedTo(String to) { - return addUnboundedTo(null, to); - } - - /** - * Same as {@link #addRange(String, String, String)} but there will be no - * upper bound. - */ - public IPv4RangeAggregatorBuilder addUnboundedFrom(String key, String from) { - addRange(new Range(key, from, null)); - return this; - } - - /** - * Same as {@link #addUnboundedFrom(String, String)} but the key will be - * generated automatically. - */ - public IPv4RangeAggregatorBuilder addUnboundedFrom(String from) { - return addUnboundedFrom(null, from); - } - - @Override - protected Ipv4RangeAggregatorFactory innerBuild(AggregationContext context, ValuesSourceConfig<Numeric> config, - AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException { - return new Ipv4RangeAggregatorFactory(name, type, config, ranges, keyed, rangeFactory, context, parent, subFactoriesBuilder, - metaData); - } - - public static class Range extends RangeAggregator.Range { - static final ParseField MASK_FIELD = new ParseField("mask"); - - private final String cidr; - - public Range(String key, Double from, Double to) { - this(key, from, null, to, null, null); - } - - public Range(String key, String from, String to) { - this(key, null, from, null, to, null); - } - - public Range(String key, String cidr) { - this(key, null, null, null, null, cidr); - } - - private Range(String key, Double from, String fromAsStr, Double to, String toAsStr, String cidr) { - super(key, from, fromAsStr, to, toAsStr); - this.cidr = cidr; - } - - /** - * Read from a stream. - */ - public Range(StreamInput in) throws IOException { - super(in); - cidr = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalString(cidr); - } - - public String mask() { - return cidr; - } - - @Override - public Range process(DocValueFormat parser, SearchContext context) { - assert parser != null; - Double from = this.from; - Double to = this.to; - String key = this.key; - if (fromAsStr != null) { - from = parser.parseDouble(fromAsStr, false, context.nowCallable()); - } - if (toAsStr != null) { - to = parser.parseDouble(toAsStr, false, context.nowCallable()); - } - if (cidr != null) { - long[] fromTo = Cidrs.cidrMaskToMinMax(cidr); - from = fromTo[0] == 0 ? Double.NEGATIVE_INFINITY : fromTo[0]; - to = fromTo[1] == InternalIPv4Range.MAX_IP ? Double.POSITIVE_INFINITY : fromTo[1]; - if (this.key == null) { - key = cidr; - } - } - return new Range(key, from, to); - } - - public static Range fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { - XContentParser.Token token; - String currentFieldName = null; - double from = Double.NEGATIVE_INFINITY; - String fromAsStr = null; - double to = Double.POSITIVE_INFINITY; - String toAsStr = null; - String key = null; - String cidr = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_NUMBER) { - if (parseFieldMatcher.match(currentFieldName, FROM_FIELD)) { - from = parser.doubleValue(); - } else if (parseFieldMatcher.match(currentFieldName, TO_FIELD)) { - to = parser.doubleValue(); - } - } else if (token == XContentParser.Token.VALUE_STRING) { - if (parseFieldMatcher.match(currentFieldName, FROM_FIELD)) { - fromAsStr = parser.text(); - } else if (parseFieldMatcher.match(currentFieldName, TO_FIELD)) { - toAsStr = parser.text(); - } else if (parseFieldMatcher.match(currentFieldName, KEY_FIELD)) { - key = parser.text(); - } else if (parseFieldMatcher.match(currentFieldName, MASK_FIELD)) { - cidr = parser.text(); - } - } - } - return new Range(key, from, fromAsStr, to, toAsStr, cidr); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (key != null) { - builder.field(KEY_FIELD.getPreferredName(), key); - } - if (cidr != null) { - builder.field(MASK_FIELD.getPreferredName(), cidr); - } else { - if (Double.isFinite(from)) { - builder.field(FROM_FIELD.getPreferredName(), from); - } - if (Double.isFinite(to)) { - builder.field(TO_FIELD.getPreferredName(), to); - } - if (fromAsStr != null) { - builder.field(FROM_FIELD.getPreferredName(), fromAsStr); - } - if (toAsStr != null) { - builder.field(TO_FIELD.getPreferredName(), toAsStr); - } - } - builder.endObject(); - return builder; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), cidr); - } - - @Override - public boolean equals(Object obj) { - return super.equals(obj) - && Objects.equals(cidr, ((Range) obj).cidr); - } - - } - -} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java deleted file mode 100644 index f58df79c16..0000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.range.ipv4; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationStreams; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; -import org.elasticsearch.search.aggregations.bucket.BucketStreams; -import org.elasticsearch.search.aggregations.bucket.range.InternalRange; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.support.ValueType; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * - */ -public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket, InternalIPv4Range> { - public static final long MAX_IP = 1L << 32; - - public final static Type TYPE = new Type("ip_range", "iprange"); - - private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalIPv4Range readResult(StreamInput in) throws IOException { - InternalIPv4Range range = new InternalIPv4Range(); - range.readFrom(in); - return range; - } - }; - - private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() { - @Override - public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { - Bucket buckets = new Bucket(context.keyed()); - buckets.readFrom(in); - return buckets; - } - - @Override - public BucketStreamContext getBucketStreamContext(Bucket bucket) { - BucketStreamContext context = new BucketStreamContext(); - context.keyed(bucket.keyed()); - return context; - } - }; - - public static void registerStream() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); - } - - public static final Factory FACTORY = new Factory(); - - public static class Bucket extends InternalRange.Bucket { - - public Bucket(boolean keyed) { - super(keyed, DocValueFormat.IP); - } - - public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed) { - super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, DocValueFormat.IP); - } - - public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed) { - super(key, from, to, docCount, aggregations, keyed, DocValueFormat.IP); - } - - @Override - public String getFromAsString() { - double from = ((Number) this.from).doubleValue(); - return Double.isInfinite(from) ? null : from == 0 ? null : DocValueFormat.IP.format(from); - } - - @Override - public String getToAsString() { - double to = ((Number) this.to).doubleValue(); - return Double.isInfinite(to) ? null : MAX_IP == to ? null : DocValueFormat.IP.format(to); - } - - @Override - protected InternalRange.Factory<Bucket, ?> getFactory() { - return FACTORY; - } - - boolean keyed() { - return keyed; - } - } - - public static class Factory extends InternalRange.Factory<InternalIPv4Range.Bucket, InternalIPv4Range> { - - @Override - public Type type() { - return TYPE; - } - - @Override - public ValueType getValueType() { - return ValueType.IP; - } - - @Override - public InternalIPv4Range create(String name, List<Bucket> ranges, DocValueFormat formatter, boolean keyed, - List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { - return new InternalIPv4Range(name, ranges, keyed, pipelineAggregators, metaData); - } - - @Override - public InternalIPv4Range create(List<Bucket> ranges, InternalIPv4Range prototype) { - return new InternalIPv4Range(prototype.name, ranges, prototype.keyed, prototype.pipelineAggregators(), prototype.metaData); - } - - @Override - public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, - DocValueFormat formatter) { - return new Bucket(key, from, to, docCount, aggregations, keyed); - } - - @Override - public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { - return new Bucket(prototype.getKey(), ((Number) prototype.getFrom()).doubleValue(), ((Number) prototype.getTo()).doubleValue(), - prototype.getDocCount(), aggregations, prototype.getKeyed()); - } - } - - public InternalIPv4Range() {} // for serialization - - public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed, List<PipelineAggregator> pipelineAggregators, - Map<String, Object> metaData) { - super(name, ranges, DocValueFormat.IP, keyed, pipelineAggregators, metaData); - } - - @Override - public Type type() { - return TYPE; - } - - @Override - public InternalRange.Factory<Bucket, InternalIPv4Range> getFactory() { - return FACTORY; - } -} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/IpRangeParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/IpRangeParser.java deleted file mode 100644 index 901300c3bf..0000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/IpRangeParser.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.range.ipv4; - -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator; -import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator.Range; -import org.elasticsearch.search.aggregations.bucket.range.RangeParser; -import org.elasticsearch.search.aggregations.support.ValueType; -import org.elasticsearch.search.aggregations.support.ValuesSourceType; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * - */ -public class IpRangeParser extends RangeParser { - - public IpRangeParser() { - super(true, false, false); - } - - @Override - protected Range parseRange(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { - return IPv4RangeAggregatorBuilder.Range.fromXContent(parser, parseFieldMatcher); - } - - @Override - protected IPv4RangeAggregatorBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType, - ValueType targetValueType, Map<ParseField, Object> otherOptions) { - IPv4RangeAggregatorBuilder factory = new IPv4RangeAggregatorBuilder(aggregationName); - @SuppressWarnings("unchecked") - List<IPv4RangeAggregatorBuilder.Range> ranges = (List<IPv4RangeAggregatorBuilder.Range>) otherOptions - .get(RangeAggregator.RANGES_FIELD); - for (IPv4RangeAggregatorBuilder.Range range : ranges) { - factory.addRange(range); - } - Boolean keyed = (Boolean) otherOptions.get(RangeAggregator.KEYED_FIELD); - if (keyed != null) { - factory.keyed(keyed); - } - return factory; - } -} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/Ipv4RangeAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/Ipv4RangeAggregatorFactory.java deleted file mode 100644 index 1c059356c5..0000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/Ipv4RangeAggregatorFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.aggregations.bucket.range.ipv4; - -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.AggregatorFactory; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; -import org.elasticsearch.search.aggregations.bucket.range.AbstractRangeAggregatorFactory; -import org.elasticsearch.search.aggregations.bucket.range.InternalRange.Factory; -import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; -import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -public class Ipv4RangeAggregatorFactory - extends AbstractRangeAggregatorFactory<Ipv4RangeAggregatorFactory, IPv4RangeAggregatorBuilder.Range> { - - public Ipv4RangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, - List<IPv4RangeAggregatorBuilder.Range> ranges, boolean keyed, Factory<?, ?> rangeFactory, AggregationContext context, - AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException { - super(name, type, config, ranges, keyed, rangeFactory, context, parent, subFactoriesBuilder, metaData); - } - -} |