diff options
author | Nicholas Knize <nknize@gmail.com> | 2015-09-16 16:54:54 -0500 |
---|---|---|
committer | Nicholas Knize <nknize@gmail.com> | 2015-10-14 16:19:09 -0500 |
commit | b31d3ddd3e5b13a73d56fb6ba9e08feebad793e4 (patch) | |
tree | 2bbda1bbf259058a8478c3ca473383df3a8fdcf6 /core/src/main/java/org/elasticsearch/search/aggregations/metrics | |
parent | 5d001d15781fc56b3bf6e7e54f2d0d736cd602e4 (diff) |
Adds geo_centroid metric aggregator
This commit adds a new metric aggregator for computing the geo_centroid over a set of geo_point fields. This can be combined with other aggregators (e.g., geohash_grid, significant_terms) for computing the geospatial centroid based on the document sets from other aggregation results.
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search/aggregations/metrics')
5 files changed, 425 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java new file mode 100644 index 0000000000..2cdf462f04 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics.geocentroid; + +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.search.aggregations.Aggregation; + +/** + * Interface for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator} + */ +public interface GeoCentroid extends Aggregation { + GeoPoint centroid(); + long count(); +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java new file mode 100644 index 0000000000..e0a307e8c0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics.geocentroid; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.XGeoHashUtils; +import org.apache.lucene.util.XGeoUtils; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.fielddata.MultiGeoPointValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; +import org.elasticsearch.search.aggregations.metrics.geobounds.InternalGeoBounds; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * A geo metric aggregator that computes a geo-centroid from a {@code geo_point} type field + */ +public final class GeoCentroidAggregator extends MetricsAggregator { + private final ValuesSource.GeoPoint valuesSource; + LongArray centroids; + LongArray counts; + + protected GeoCentroidAggregator(String name, AggregationContext aggregationContext, Aggregator parent, + ValuesSource.GeoPoint valuesSource, List<PipelineAggregator> pipelineAggregators, + Map<String, Object> metaData) throws IOException { + super(name, aggregationContext, parent, pipelineAggregators, metaData); + this.valuesSource = valuesSource; + if (valuesSource != null) { + final BigArrays bigArrays = context.bigArrays(); + centroids = bigArrays.newLongArray(1, true); + counts = bigArrays.newLongArray(1, true); + } + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + final MultiGeoPointValues values = valuesSource.geoPointValues(ctx); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + centroids = bigArrays.grow(centroids, bucket + 1); + counts = bigArrays.grow(counts, bucket + 1); + + values.setDocument(doc); + final int valueCount = values.count(); + if (valueCount > 0) { + double[] pt = new double[2]; + // get the previously accumulated number of counts + long prevCounts = counts.get(bucket); + // increment by the number of points for this document + counts.increment(bucket, valueCount); + // get the previous GeoPoint if a moving avg was computed + if (prevCounts > 0) { + final GeoPoint centroid = GeoPoint.fromIndexLong(centroids.get(bucket)); + pt[0] = centroid.lon(); + pt[1] = centroid.lat(); + } + // update the moving average + for (int i = 0; i < valueCount; ++i) { + GeoPoint value = values.valueAt(i); + pt[0] = pt[0] + (value.getLon() - pt[0]) / ++prevCounts; + pt[1] = pt[1] + (value.getLat() - pt[1]) / prevCounts; + } + centroids.set(bucket, XGeoUtils.mortonHash(pt[0], pt[1])); + } + } + }; + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= centroids.size()) { + return buildEmptyAggregation(); + } + final long bucketCount = counts.get(bucket); + final GeoPoint bucketCentroid = (bucketCount > 0) ? GeoPoint.fromIndexLong(centroids.get(bucket)) : + new GeoPoint(Double.NaN, Double.NaN); + return new InternalGeoCentroid(name, bucketCentroid , bucketCount, pipelineAggregators(), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalGeoCentroid(name, null, 0l, pipelineAggregators(), metaData()); + } + + @Override + public void doClose() { + Releasables.close(centroids, counts); + } + + public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.GeoPoint> { + protected Factory(String name, ValuesSourceConfig<ValuesSource.GeoPoint> config) { + super(name, InternalGeoBounds.TYPE.name(), config); + } + + @Override + protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, + List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException { + return new GeoCentroidAggregator(name, aggregationContext, parent, null, pipelineAggregators, metaData); + } + + @Override + protected Aggregator doCreateInternal(ValuesSource.GeoPoint valuesSource, AggregationContext aggregationContext, Aggregator parent, + boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) + throws IOException { + return new GeoCentroidAggregator(name, aggregationContext, parent, valuesSource, pipelineAggregators, metaData); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java new file mode 100644 index 0000000000..9d6823c675 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics.geocentroid; + +import org.elasticsearch.search.aggregations.metrics.ValuesSourceMetricsAggregationBuilder; + +/** + * Builder class for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator} + */ +public class GeoCentroidBuilder extends ValuesSourceMetricsAggregationBuilder<GeoCentroidBuilder> { + + public GeoCentroidBuilder(String name) { + super(name, InternalGeoCentroid.TYPE.name()); + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java new file mode 100644 index 0000000000..49a7bc8e96 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics.geocentroid; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceParser; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +/** + * Parser class for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator} + */ +public class GeoCentroidParser implements Aggregator.Parser { + + @Override + public String type() { + return InternalGeoCentroid.TYPE.name(); + } + + @Override + public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { + ValuesSourceParser<ValuesSource.GeoPoint> vsParser = ValuesSourceParser.geoPoint(aggregationName, InternalGeoCentroid.TYPE, context) + .targetValueType(ValueType.GEOPOINT) + .formattable(true) + .build(); + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (vsParser.token(currentFieldName, token, parser)) { + continue; + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in aggregation [" + aggregationName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } + return new GeoCentroidAggregator.Factory(aggregationName, vsParser.config()); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java new file mode 100644 index 0000000000..192e6438ba --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java @@ -0,0 +1,154 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics.geocentroid; + +import org.apache.lucene.util.XGeoUtils; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Serialization and merge logic for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator} + */ +public class InternalGeoCentroid extends InternalMetricsAggregation implements GeoCentroid { + + public final static Type TYPE = new Type("geo_centroid"); + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalGeoCentroid readResult(StreamInput in) throws IOException { + InternalGeoCentroid result = new InternalGeoCentroid(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + protected GeoPoint centroid; + protected long count; + + protected InternalGeoCentroid() { + } + + public InternalGeoCentroid(String name, GeoPoint centroid, long count, List<PipelineAggregator> + pipelineAggregators, Map<String, Object> metaData) { + super(name, pipelineAggregators, metaData); + this.centroid = centroid; + this.count = count; + } + + @Override + public GeoPoint centroid() { + return (centroid == null || Double.isNaN(centroid.lon()) ? null : centroid); + } + + @Override + public long count() { + return count; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalGeoCentroid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { + double lonSum = Double.NaN; + double latSum = Double.NaN; + int totalCount = 0; + for (InternalAggregation aggregation : aggregations) { + InternalGeoCentroid centroidAgg = (InternalGeoCentroid) aggregation; + if (centroidAgg.count > 0) { + totalCount += centroidAgg.count; + if (Double.isNaN(lonSum)) { + lonSum = centroidAgg.count * centroidAgg.centroid.getLon(); + latSum = centroidAgg.count * centroidAgg.centroid.getLat(); + } else { + lonSum += (centroidAgg.count * centroidAgg.centroid.getLon()); + latSum += (centroidAgg.count * centroidAgg.centroid.getLat()); + } + } + } + final GeoPoint result = (Double.isNaN(lonSum)) ? null : new GeoPoint(latSum/totalCount, lonSum/totalCount); + return new InternalGeoCentroid(name, result, totalCount, pipelineAggregators(), getMetaData()); + } + + @Override + public Object getProperty(List<String> path) { + if (path.isEmpty()) { + return this; + } else if (path.size() == 1) { + String coordinate = path.get(0); + switch (coordinate) { + case "value": + return centroid; + case "lat": + return centroid.lat(); + case "lon": + return centroid.lon(); + default: + throw new IllegalArgumentException("Found unknown path element [" + coordinate + "] in [" + getName() + "]"); + } + } else { + throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); + } + } + + @Override + protected void doReadFrom(StreamInput in) throws IOException { + count = in.readVLong(); + if (count > 0) { + centroid = GeoPoint.fromIndexLong(in.readLong()); + } + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeVLong(count); + if (centroid != null) { + out.writeLong(XGeoUtils.mortonHash(centroid.lon(), centroid.lat())); + } + } + + static class Fields { + public static final XContentBuilderString CENTROID = new XContentBuilderString("location"); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + if (centroid != null) { + builder.startObject(Fields.CENTROID).field("lat", centroid.lat()).field("lon", centroid.lon()).endObject(); + } + return builder; + } +} |