summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java
diff options
context:
space:
mode:
authorNicholas Knize <nknize@gmail.com>2015-09-16 16:54:54 -0500
committerNicholas Knize <nknize@gmail.com>2015-10-14 16:19:09 -0500
commitb31d3ddd3e5b13a73d56fb6ba9e08feebad793e4 (patch)
tree2bbda1bbf259058a8478c3ca473383df3a8fdcf6 /core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java
parent5d001d15781fc56b3bf6e7e54f2d0d736cd602e4 (diff)
Adds geo_centroid metric aggregator
This commit adds a new metric aggregator for computing the geo_centroid over a set of geo_point fields. This can be combined with other aggregators (e.g., geohash_grid, significant_terms) for computing the geospatial centroid based on the document sets from other aggregation results.
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java')
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java144
1 files changed, 144 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java
new file mode 100644
index 0000000000..e0a307e8c0
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.geocentroid;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.util.XGeoHashUtils;
+import org.apache.lucene.util.XGeoUtils;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
+import org.elasticsearch.search.aggregations.metrics.geobounds.InternalGeoBounds;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A geo metric aggregator that computes a geo-centroid from a {@code geo_point} type field
+ */
+public final class GeoCentroidAggregator extends MetricsAggregator {
+ private final ValuesSource.GeoPoint valuesSource;
+ LongArray centroids;
+ LongArray counts;
+
+ protected GeoCentroidAggregator(String name, AggregationContext aggregationContext, Aggregator parent,
+ ValuesSource.GeoPoint valuesSource, List<PipelineAggregator> pipelineAggregators,
+ Map<String, Object> metaData) throws IOException {
+ super(name, aggregationContext, parent, pipelineAggregators, metaData);
+ this.valuesSource = valuesSource;
+ if (valuesSource != null) {
+ final BigArrays bigArrays = context.bigArrays();
+ centroids = bigArrays.newLongArray(1, true);
+ counts = bigArrays.newLongArray(1, true);
+ }
+ }
+
+ @Override
+ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+ if (valuesSource == null) {
+ return LeafBucketCollector.NO_OP_COLLECTOR;
+ }
+ final BigArrays bigArrays = context.bigArrays();
+ final MultiGeoPointValues values = valuesSource.geoPointValues(ctx);
+ return new LeafBucketCollectorBase(sub, values) {
+ @Override
+ public void collect(int doc, long bucket) throws IOException {
+ centroids = bigArrays.grow(centroids, bucket + 1);
+ counts = bigArrays.grow(counts, bucket + 1);
+
+ values.setDocument(doc);
+ final int valueCount = values.count();
+ if (valueCount > 0) {
+ double[] pt = new double[2];
+ // get the previously accumulated number of counts
+ long prevCounts = counts.get(bucket);
+ // increment by the number of points for this document
+ counts.increment(bucket, valueCount);
+ // get the previous GeoPoint if a moving avg was computed
+ if (prevCounts > 0) {
+ final GeoPoint centroid = GeoPoint.fromIndexLong(centroids.get(bucket));
+ pt[0] = centroid.lon();
+ pt[1] = centroid.lat();
+ }
+ // update the moving average
+ for (int i = 0; i < valueCount; ++i) {
+ GeoPoint value = values.valueAt(i);
+ pt[0] = pt[0] + (value.getLon() - pt[0]) / ++prevCounts;
+ pt[1] = pt[1] + (value.getLat() - pt[1]) / prevCounts;
+ }
+ centroids.set(bucket, XGeoUtils.mortonHash(pt[0], pt[1]));
+ }
+ }
+ };
+ }
+
+ @Override
+ public InternalAggregation buildAggregation(long bucket) {
+ if (valuesSource == null || bucket >= centroids.size()) {
+ return buildEmptyAggregation();
+ }
+ final long bucketCount = counts.get(bucket);
+ final GeoPoint bucketCentroid = (bucketCount > 0) ? GeoPoint.fromIndexLong(centroids.get(bucket)) :
+ new GeoPoint(Double.NaN, Double.NaN);
+ return new InternalGeoCentroid(name, bucketCentroid , bucketCount, pipelineAggregators(), metaData());
+ }
+
+ @Override
+ public InternalAggregation buildEmptyAggregation() {
+ return new InternalGeoCentroid(name, null, 0l, pipelineAggregators(), metaData());
+ }
+
+ @Override
+ public void doClose() {
+ Releasables.close(centroids, counts);
+ }
+
+ public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.GeoPoint> {
+ protected Factory(String name, ValuesSourceConfig<ValuesSource.GeoPoint> config) {
+ super(name, InternalGeoBounds.TYPE.name(), config);
+ }
+
+ @Override
+ protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent,
+ List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
+ return new GeoCentroidAggregator(name, aggregationContext, parent, null, pipelineAggregators, metaData);
+ }
+
+ @Override
+ protected Aggregator doCreateInternal(ValuesSource.GeoPoint valuesSource, AggregationContext aggregationContext, Aggregator parent,
+ boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
+ throws IOException {
+ return new GeoCentroidAggregator(name, aggregationContext, parent, valuesSource, pipelineAggregators, metaData);
+ }
+ }
+}