summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/search/aggregations/metrics
diff options
context:
space:
mode:
authorNicholas Knize <nknize@gmail.com>2015-09-16 16:54:54 -0500
committerNicholas Knize <nknize@gmail.com>2015-10-14 16:19:09 -0500
commitb31d3ddd3e5b13a73d56fb6ba9e08feebad793e4 (patch)
tree2bbda1bbf259058a8478c3ca473383df3a8fdcf6 /core/src/main/java/org/elasticsearch/search/aggregations/metrics
parent5d001d15781fc56b3bf6e7e54f2d0d736cd602e4 (diff)
Adds geo_centroid metric aggregator
This commit adds a new metric aggregator for computing the geo_centroid over a set of geo_point fields. This can be combined with other aggregators (e.g., geohash_grid, significant_terms) for computing the geospatial centroid based on the document sets from other aggregation results.
Diffstat (limited to 'core/src/main/java/org/elasticsearch/search/aggregations/metrics')
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java31
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java144
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java33
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java63
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java154
5 files changed, 425 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java
new file mode 100644
index 0000000000..2cdf462f04
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroid.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.geocentroid;
+
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.search.aggregations.Aggregation;
+
+/**
+ * Interface for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator}
+ */
+public interface GeoCentroid extends Aggregation {
+ GeoPoint centroid();
+ long count();
+}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java
new file mode 100644
index 0000000000..e0a307e8c0
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.geocentroid;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.util.XGeoHashUtils;
+import org.apache.lucene.util.XGeoUtils;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
+import org.elasticsearch.search.aggregations.metrics.geobounds.InternalGeoBounds;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A geo metric aggregator that computes a geo-centroid from a {@code geo_point} type field
+ */
+public final class GeoCentroidAggregator extends MetricsAggregator {
+ private final ValuesSource.GeoPoint valuesSource;
+ LongArray centroids;
+ LongArray counts;
+
+ protected GeoCentroidAggregator(String name, AggregationContext aggregationContext, Aggregator parent,
+ ValuesSource.GeoPoint valuesSource, List<PipelineAggregator> pipelineAggregators,
+ Map<String, Object> metaData) throws IOException {
+ super(name, aggregationContext, parent, pipelineAggregators, metaData);
+ this.valuesSource = valuesSource;
+ if (valuesSource != null) {
+ final BigArrays bigArrays = context.bigArrays();
+ centroids = bigArrays.newLongArray(1, true);
+ counts = bigArrays.newLongArray(1, true);
+ }
+ }
+
+ @Override
+ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+ if (valuesSource == null) {
+ return LeafBucketCollector.NO_OP_COLLECTOR;
+ }
+ final BigArrays bigArrays = context.bigArrays();
+ final MultiGeoPointValues values = valuesSource.geoPointValues(ctx);
+ return new LeafBucketCollectorBase(sub, values) {
+ @Override
+ public void collect(int doc, long bucket) throws IOException {
+ centroids = bigArrays.grow(centroids, bucket + 1);
+ counts = bigArrays.grow(counts, bucket + 1);
+
+ values.setDocument(doc);
+ final int valueCount = values.count();
+ if (valueCount > 0) {
+ double[] pt = new double[2];
+ // get the previously accumulated number of counts
+ long prevCounts = counts.get(bucket);
+ // increment by the number of points for this document
+ counts.increment(bucket, valueCount);
+ // get the previous GeoPoint if a moving avg was computed
+ if (prevCounts > 0) {
+ final GeoPoint centroid = GeoPoint.fromIndexLong(centroids.get(bucket));
+ pt[0] = centroid.lon();
+ pt[1] = centroid.lat();
+ }
+ // update the moving average
+ for (int i = 0; i < valueCount; ++i) {
+ GeoPoint value = values.valueAt(i);
+ pt[0] = pt[0] + (value.getLon() - pt[0]) / ++prevCounts;
+ pt[1] = pt[1] + (value.getLat() - pt[1]) / prevCounts;
+ }
+ centroids.set(bucket, XGeoUtils.mortonHash(pt[0], pt[1]));
+ }
+ }
+ };
+ }
+
+ @Override
+ public InternalAggregation buildAggregation(long bucket) {
+ if (valuesSource == null || bucket >= centroids.size()) {
+ return buildEmptyAggregation();
+ }
+ final long bucketCount = counts.get(bucket);
+ final GeoPoint bucketCentroid = (bucketCount > 0) ? GeoPoint.fromIndexLong(centroids.get(bucket)) :
+ new GeoPoint(Double.NaN, Double.NaN);
+ return new InternalGeoCentroid(name, bucketCentroid , bucketCount, pipelineAggregators(), metaData());
+ }
+
+ @Override
+ public InternalAggregation buildEmptyAggregation() {
+ return new InternalGeoCentroid(name, null, 0l, pipelineAggregators(), metaData());
+ }
+
+ @Override
+ public void doClose() {
+ Releasables.close(centroids, counts);
+ }
+
+ public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.GeoPoint> {
+ protected Factory(String name, ValuesSourceConfig<ValuesSource.GeoPoint> config) {
+ super(name, InternalGeoBounds.TYPE.name(), config);
+ }
+
+ @Override
+ protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent,
+ List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
+ return new GeoCentroidAggregator(name, aggregationContext, parent, null, pipelineAggregators, metaData);
+ }
+
+ @Override
+ protected Aggregator doCreateInternal(ValuesSource.GeoPoint valuesSource, AggregationContext aggregationContext, Aggregator parent,
+ boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
+ throws IOException {
+ return new GeoCentroidAggregator(name, aggregationContext, parent, valuesSource, pipelineAggregators, metaData);
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java
new file mode 100644
index 0000000000..9d6823c675
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.geocentroid;
+
+import org.elasticsearch.search.aggregations.metrics.ValuesSourceMetricsAggregationBuilder;
+
+/**
+ * Builder class for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator}
+ */
+public class GeoCentroidBuilder extends ValuesSourceMetricsAggregationBuilder<GeoCentroidBuilder> {
+
+ public GeoCentroidBuilder(String name) {
+ super(name, InternalGeoCentroid.TYPE.name());
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java
new file mode 100644
index 0000000000..49a7bc8e96
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidParser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.geocentroid;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.SearchParseException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValueType;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+
+/**
+ * Parser class for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator}
+ */
+public class GeoCentroidParser implements Aggregator.Parser {
+
+ @Override
+ public String type() {
+ return InternalGeoCentroid.TYPE.name();
+ }
+
+ @Override
+ public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
+ ValuesSourceParser<ValuesSource.GeoPoint> vsParser = ValuesSourceParser.geoPoint(aggregationName, InternalGeoCentroid.TYPE, context)
+ .targetValueType(ValueType.GEOPOINT)
+ .formattable(true)
+ .build();
+ XContentParser.Token token;
+ String currentFieldName = null;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (vsParser.token(currentFieldName, token, parser)) {
+ continue;
+ } else {
+ throw new SearchParseException(context, "Unknown key for a " + token + " in aggregation [" + aggregationName + "]: ["
+ + currentFieldName + "].", parser.getTokenLocation());
+ }
+ }
+ return new GeoCentroidAggregator.Factory(aggregationName, vsParser.config());
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java
new file mode 100644
index 0000000000..192e6438ba
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.geocentroid;
+
+import org.apache.lucene.util.XGeoUtils;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.search.aggregations.AggregationStreams;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialization and merge logic for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator}
+ */
+public class InternalGeoCentroid extends InternalMetricsAggregation implements GeoCentroid {
+
+ public final static Type TYPE = new Type("geo_centroid");
+ public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
+ @Override
+ public InternalGeoCentroid readResult(StreamInput in) throws IOException {
+ InternalGeoCentroid result = new InternalGeoCentroid();
+ result.readFrom(in);
+ return result;
+ }
+ };
+
+ public static void registerStreams() {
+ AggregationStreams.registerStream(STREAM, TYPE.stream());
+ }
+
+ protected GeoPoint centroid;
+ protected long count;
+
+ protected InternalGeoCentroid() {
+ }
+
+ public InternalGeoCentroid(String name, GeoPoint centroid, long count, List<PipelineAggregator>
+ pipelineAggregators, Map<String, Object> metaData) {
+ super(name, pipelineAggregators, metaData);
+ this.centroid = centroid;
+ this.count = count;
+ }
+
+ @Override
+ public GeoPoint centroid() {
+ return (centroid == null || Double.isNaN(centroid.lon()) ? null : centroid);
+ }
+
+ @Override
+ public long count() {
+ return count;
+ }
+
+ @Override
+ public Type type() {
+ return TYPE;
+ }
+
+ @Override
+ public InternalGeoCentroid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+ double lonSum = Double.NaN;
+ double latSum = Double.NaN;
+ int totalCount = 0;
+ for (InternalAggregation aggregation : aggregations) {
+ InternalGeoCentroid centroidAgg = (InternalGeoCentroid) aggregation;
+ if (centroidAgg.count > 0) {
+ totalCount += centroidAgg.count;
+ if (Double.isNaN(lonSum)) {
+ lonSum = centroidAgg.count * centroidAgg.centroid.getLon();
+ latSum = centroidAgg.count * centroidAgg.centroid.getLat();
+ } else {
+ lonSum += (centroidAgg.count * centroidAgg.centroid.getLon());
+ latSum += (centroidAgg.count * centroidAgg.centroid.getLat());
+ }
+ }
+ }
+ final GeoPoint result = (Double.isNaN(lonSum)) ? null : new GeoPoint(latSum/totalCount, lonSum/totalCount);
+ return new InternalGeoCentroid(name, result, totalCount, pipelineAggregators(), getMetaData());
+ }
+
+ @Override
+ public Object getProperty(List<String> path) {
+ if (path.isEmpty()) {
+ return this;
+ } else if (path.size() == 1) {
+ String coordinate = path.get(0);
+ switch (coordinate) {
+ case "value":
+ return centroid;
+ case "lat":
+ return centroid.lat();
+ case "lon":
+ return centroid.lon();
+ default:
+ throw new IllegalArgumentException("Found unknown path element [" + coordinate + "] in [" + getName() + "]");
+ }
+ } else {
+ throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
+ }
+ }
+
+ @Override
+ protected void doReadFrom(StreamInput in) throws IOException {
+ count = in.readVLong();
+ if (count > 0) {
+ centroid = GeoPoint.fromIndexLong(in.readLong());
+ }
+ }
+
+ @Override
+ protected void doWriteTo(StreamOutput out) throws IOException {
+ out.writeVLong(count);
+ if (centroid != null) {
+ out.writeLong(XGeoUtils.mortonHash(centroid.lon(), centroid.lat()));
+ }
+ }
+
+ static class Fields {
+ public static final XContentBuilderString CENTROID = new XContentBuilderString("location");
+ }
+
+ @Override
+ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+ if (centroid != null) {
+ builder.startObject(Fields.CENTROID).field("lat", centroid.lat()).field("lon", centroid.lon()).endObject();
+ }
+ return builder;
+ }
+}