aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Storona <vstorona@cybervisiontech.com>2016-11-25 20:28:02 +0200
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2017-11-13 11:04:54 +0200
commit496c97d14eb428a5aff74e82d662a0da6930e94f (patch)
treeebb2588ad43ea0f68acc3e9633f369ff423b5617
parent29e054769ea1cfd36f08df991d6263781374c92c (diff)
DRILL-5337: OpenTSDB storage plugin
closes #774
-rw-r--r--contrib/pom.xml1
-rw-r--r--contrib/storage-opentsdb/README.md69
-rw-r--r--contrib/storage-opentsdb/pom.xml80
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java32
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java81
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java53
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java169
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java258
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java42
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java77
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java77
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java132
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java66
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java50
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java28
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java124
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java55
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java148
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java187
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java174
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java63
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java77
-rw-r--r--contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java77
-rw-r--r--contrib/storage-opentsdb/src/main/resources/bootstrap-storage-plugins.json9
-rw-r--r--contrib/storage-opentsdb/src/main/resources/drill-module.conf21
-rw-r--r--contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestDataHolder.java247
-rw-r--r--contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java189
-rw-r--r--distribution/pom.xml5
28 files changed, 2591 insertions, 0 deletions
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 20149237e..d4ad4340c 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -38,6 +38,7 @@
<module>storage-mongo</module>
<module>storage-jdbc</module>
<module>storage-kudu</module>
+ <module>storage-opentsdb</module>
<module>sqlline</module>
<module>data</module>
<module>gis</module>
diff --git a/contrib/storage-opentsdb/README.md b/contrib/storage-opentsdb/README.md
new file mode 100644
index 000000000..0c616b505
--- /dev/null
+++ b/contrib/storage-opentsdb/README.md
@@ -0,0 +1,69 @@
+# drill-storage-openTSDB
+
+Implementation of TSDB storage plugin. Plugin uses REST API to work with TSDB.
+
+For more information about openTSDB follow this link <http://opentsdb.net>
+
+There is list of required params:
+
+* metric - The name of a metric stored in the db.
+
+* start - The start time for the query. This can be a relative or absolute timestamp.
+
+* aggregator - The name of an aggregation function to use.
+
+optional param is:
+
+* downsample - An optional downsampling function to reduce the amount of data returned.
+
+* end - An end time for the query. If not supplied, the TSD will assume the local system time on the server.
+This may be a relative or absolute timestamp. This param is optional, and if it isn't specified we will send null
+to the db in this field, but in this case db will assume the local system time on the server.
+
+List of supported aggregators
+
+<http://opentsdb.net/docs/build/html/user_guide/query/aggregators.html>
+
+List of supported time
+
+<http://opentsdb.net/docs/build/html/user_guide/query/dates.html>
+
+Params must be specified in FROM clause of the query separated by commas. For example
+
+`openTSDB.(metric=metric_name, start=4d-ago, aggregator=sum)`
+
+Supported queries for now are listed below:
+
+```
+USE openTSDB
+```
+
+```
+SHOW tables
+```
+Will print available metrics. Max number of the printed results is a Integer.MAX value
+
+```
+SELECT * FROM openTSDB. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)`
+```
+Return aggregated elements from `warp.speed.test` table since 47y-ago
+
+```
+SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)`
+```
+Return aggregated elements from `warp.speed.test` table
+
+```
+SELECT `timestamp`, sum(`aggregated value`) FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp`
+```
+Return aggregated and grouped value by standard drill functions from `warp.speed.test table`, but with the custom aggregator
+
+```
+SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)`
+```
+Return aggregated data limited by downsample
+
+```
+SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)`
+```
+Return aggregated data limited by end time \ No newline at end of file
diff --git a/contrib/storage-opentsdb/pom.xml b/contrib/storage-opentsdb/pom.xml
new file mode 100644
index 000000000..aff1bfaa0
--- /dev/null
+++ b/contrib/storage-opentsdb/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>1.12.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-opentsdb-storage</artifactId>
+
+ <name>contrib/opentsdb-storage-plugin</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-standalone</artifactId>
+ <version>2.5.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.retrofit2</groupId>
+ <artifactId>retrofit</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.retrofit2</groupId>
+ <artifactId>converter-jackson</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.madhukaraphatak</groupId>
+ <artifactId>java-sizeof_2.11</artifactId>
+ <version>0.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
new file mode 100644
index 000000000..c812ff5b6
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+public interface Constants {
+ /**
+ * openTSDB required constants for API call
+ */
+ public static final String DEFAULT_TIME = "47y-ago";
+ public static final String SUM_AGGREGATOR = "sum";
+
+ public static final String TIME_PARAM = "start";
+ public static final String END_TIME_PARAM = "end";
+ public static final String METRIC_PARAM = "metric";
+ public static final String AGGREGATOR_PARAM = "aggregator";
+ public static final String DOWNSAMPLE_PARAM = "downsample";
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java
new file mode 100644
index 000000000..bdbb670fb
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+import org.apache.drill.exec.store.openTSDB.client.Schema;
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.DOUBLE;
+import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.STRING;
+import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.TIMESTAMP;
+
+public class DrillOpenTSDBTable extends DynamicDrillTable {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(DrillOpenTSDBTable.class);
+
+ private final Schema schema;
+
+ public DrillOpenTSDBTable(String storageEngineName, OpenTSDBStoragePlugin plugin, Schema schema, OpenTSDBScanSpec scanSpec) {
+ super(plugin, storageEngineName, scanSpec);
+ this.schema = schema;
+ }
+
+ @Override
+ public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+ List<String> names = Lists.newArrayList();
+ List<RelDataType> types = Lists.newArrayList();
+ convertToRelDataType(typeFactory, names, types);
+ return typeFactory.createStructType(types, names);
+ }
+
+ private void convertToRelDataType(RelDataTypeFactory typeFactory, List<String> names, List<RelDataType> types) {
+ for (ColumnDTO column : schema.getColumns()) {
+ names.add(column.getColumnName());
+ RelDataType type = getSqlTypeFromOpenTSDBType(typeFactory, column.getColumnType());
+ type = typeFactory.createTypeWithNullability(type, column.isNullable());
+ types.add(type);
+ }
+ }
+
+ private RelDataType getSqlTypeFromOpenTSDBType(RelDataTypeFactory typeFactory, OpenTSDBTypes type) {
+ switch (type) {
+ case STRING:
+ return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
+ case DOUBLE:
+ return typeFactory.createSqlType(SqlTypeName.DOUBLE);
+ case TIMESTAMP:
+ return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+ default:
+ throw UserException.unsupportedError()
+ .message(String.format("%s is unsupported now. Currently supported types is %s, %s, %s", type, STRING, DOUBLE, TIMESTAMP))
+ .build(log);
+ }
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
new file mode 100644
index 000000000..935aaa503
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(FragmentContext context, OpenTSDBSubScan subScan,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ List<RecordReader> readers = Lists.newArrayList();
+ List<SchemaPath> columns;
+
+ for (OpenTSDBSubScan.OpenTSDBSubScanSpec scanSpec : subScan.getTabletScanSpecList()) {
+ try {
+ if ((columns = subScan.getColumns()) == null) {
+ columns = GroupScan.ALL_COLUMNS;
+ }
+ readers.add(new OpenTSDBRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns));
+ } catch (Exception e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+ return new ScanBatch(subScan, context, readers);
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
new file mode 100644
index 000000000..47c805ad5
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.madhukaraphatak.sizeof.SizeEstimator;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBSubScan.OpenTSDBSubScanSpec;
+import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
+
+@JsonTypeName("openTSDB-scan")
+public class OpenTSDBGroupScan extends AbstractGroupScan {
+
+ private OpenTSDBStoragePluginConfig storagePluginConfig;
+ private OpenTSDBScanSpec openTSDBScanSpec;
+ private OpenTSDBStoragePlugin storagePlugin;
+
+ private List<SchemaPath> columns;
+
+ @JsonCreator
+ public OpenTSDBGroupScan(@JsonProperty("openTSDBScanSpec") OpenTSDBScanSpec openTSDBScanSpec,
+ @JsonProperty("storage") OpenTSDBStoragePluginConfig openTSDBStoragePluginConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+ this((OpenTSDBStoragePlugin) pluginRegistry.getPlugin(openTSDBStoragePluginConfig), openTSDBScanSpec, columns);
+ }
+
+ public OpenTSDBGroupScan(OpenTSDBStoragePlugin storagePlugin,
+ OpenTSDBScanSpec scanSpec, List<SchemaPath> columns) {
+ super((String) null);
+ this.storagePlugin = storagePlugin;
+ this.storagePluginConfig = storagePlugin.getConfig();
+ this.openTSDBScanSpec = scanSpec;
+ this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ *
+ * @param that The OpenTSDBGroupScan to clone
+ */
+ private OpenTSDBGroupScan(OpenTSDBGroupScan that) {
+ super((String) null);
+ this.columns = that.columns;
+ this.openTSDBScanSpec = that.openTSDBScanSpec;
+ this.storagePlugin = that.storagePlugin;
+ this.storagePluginConfig = that.storagePluginConfig;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+ }
+
+ @Override
+ public OpenTSDBSubScan getSpecificScan(int minorFragmentId) {
+ List<OpenTSDBSubScanSpec> scanSpecList = Lists.newArrayList();
+ scanSpecList.add(new OpenTSDBSubScanSpec(getTableName()));
+ return new OpenTSDBSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ ServiceImpl client = storagePlugin.getClient();
+ Map<String, String> params = fromRowData(openTSDBScanSpec.getTableName());
+ Set<MetricDTO> allMetrics = client.getAllMetrics(params);
+ long numMetrics = allMetrics.size();
+ float approxDiskCost = 0;
+ if (numMetrics != 0) {
+ MetricDTO metricDTO = allMetrics.iterator().next();
+ // This method estimates the sizes of Java objects (number of bytes of memory they occupy).
+ // more detailed information about how this estimation method work you can find in this article
+ // http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
+ approxDiskCost = SizeEstimator.estimate(metricDTO) * numMetrics;
+ }
+ return new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, numMetrics, 1, approxDiskCost);
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new OpenTSDBGroupScan(this);
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ @JsonIgnore
+ public boolean canPushdownProjects(List<SchemaPath> columns) {
+ return true;
+ }
+
+ @JsonIgnore
+ public String getTableName() {
+ return getOpenTSDBScanSpec().getTableName();
+ }
+
+ @JsonProperty
+ public OpenTSDBScanSpec getOpenTSDBScanSpec() {
+ return openTSDBScanSpec;
+ }
+
+ @JsonProperty("storage")
+ public OpenTSDBStoragePluginConfig getStoragePluginConfig() {
+ return storagePluginConfig;
+ }
+
+ @JsonProperty
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ OpenTSDBGroupScan newScan = new OpenTSDBGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
+
+ @Override
+ public String toString() {
+ return "OpenTSDBGroupScan [OpenTSDBScanSpec=" + openTSDBScanSpec + ", columns=" + columns
+ + "]";
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java
new file mode 100644
index 000000000..044c23265
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+import org.apache.drill.exec.store.openTSDB.client.Schema;
+import org.apache.drill.exec.store.openTSDB.client.Service;
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
+
+public class OpenTSDBRecordReader extends AbstractRecordReader {
+
+ private static final Logger log = LoggerFactory.getLogger(OpenTSDBRecordReader.class);
+
+ // batch size should not exceed max allowed record count
+ private static final int TARGET_RECORD_COUNT = 4000;
+
+ private static final Map<OpenTSDBTypes, MinorType> TYPES;
+
+ private Service db;
+
+ private Iterator<MetricDTO> tableIterator;
+ private OutputMutator output;
+ private ImmutableList<ProjectedColumnInfo> projectedCols;
+
+ private Map<String, String> params;
+
+ public OpenTSDBRecordReader(Service client, OpenTSDBSubScan.OpenTSDBSubScanSpec subScanSpec,
+ List<SchemaPath> projectedColumns) throws IOException {
+ setColumns(projectedColumns);
+ this.db = client;
+ this.params =
+ fromRowData(subScanSpec.getTableName());
+ log.debug("Scan spec: {}", subScanSpec);
+ }
+
+ @Override
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ this.output = output;
+ Set<MetricDTO> metrics =
+ db.getAllMetrics(params);
+ if (metrics == null) {
+ throw UserException.validationError()
+ .message(String.format("Table '%s' not found", params.get(METRIC_PARAM)))
+ .build(log);
+ }
+ this.tableIterator = metrics.iterator();
+ }
+
+ @Override
+ public int next() {
+ try {
+ return processOpenTSDBTablesData();
+ } catch (SchemaChangeException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ static {
+ TYPES = ImmutableMap.<OpenTSDBTypes, MinorType>builder()
+ .put(OpenTSDBTypes.STRING, MinorType.VARCHAR)
+ .put(OpenTSDBTypes.DOUBLE, MinorType.FLOAT8)
+ .put(OpenTSDBTypes.TIMESTAMP, MinorType.TIMESTAMP)
+ .build();
+ }
+
+ private static class ProjectedColumnInfo {
+ ValueVector vv;
+ ColumnDTO openTSDBColumn;
+ }
+
+ private int processOpenTSDBTablesData() throws SchemaChangeException {
+ int rowCounter = 0;
+ while (tableIterator.hasNext() && rowCounter < TARGET_RECORD_COUNT) {
+ MetricDTO metricDTO = tableIterator.next();
+ rowCounter = addRowResult(metricDTO, rowCounter);
+ }
+ return rowCounter;
+ }
+
+ private int addRowResult(MetricDTO table, int rowCounter) throws SchemaChangeException {
+ setupProjectedColsIfItNull();
+ for (String time : table.getDps().keySet()) {
+ String value = table.getDps().get(time);
+ setupDataToDrillTable(table, time, value, table.getTags(), rowCounter);
+ rowCounter++;
+ }
+ return rowCounter;
+ }
+
+ private void setupProjectedColsIfItNull() throws SchemaChangeException {
+ if (projectedCols == null) {
+ initCols(new Schema(db, params.get(METRIC_PARAM)));
+ }
+ }
+
+ private void setupDataToDrillTable(MetricDTO table, String timestamp, String value, Map<String, String> tags, int rowCount) {
+ for (ProjectedColumnInfo pci : projectedCols) {
+ switch (pci.openTSDBColumn.getColumnName()) {
+ case "metric":
+ setStringColumnValue(table.getMetric(), pci, rowCount);
+ break;
+ case "aggregate tags":
+ setStringColumnValue(table.getAggregateTags().toString(), pci, rowCount);
+ break;
+ case "timestamp":
+ setTimestampColumnValue(timestamp, pci, rowCount);
+ break;
+ case "aggregated value":
+ setDoubleColumnValue(value, pci, rowCount);
+ break;
+ default:
+ setStringColumnValue(tags.get(pci.openTSDBColumn.getColumnName()), pci, rowCount);
+ }
+ }
+ }
+
+ private void setTimestampColumnValue(String timestamp, ProjectedColumnInfo pci, int rowCount) {
+ setTimestampColumnValue(timestamp != null ? Long.parseLong(timestamp) : Long.parseLong("0"), pci, rowCount);
+ }
+
+ private void setDoubleColumnValue(String value, ProjectedColumnInfo pci, int rowCount) {
+ setDoubleColumnValue(value != null ? Double.parseDouble(value) : 0.0, pci, rowCount);
+ }
+
+ private void setStringColumnValue(String data, ProjectedColumnInfo pci, int rowCount) {
+ if (data == null) {
+ data = "null";
+ }
+ ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
+ ((NullableVarCharVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowCount, value, 0, value.remaining());
+ }
+
+ private void setTimestampColumnValue(Long data, ProjectedColumnInfo pci, int rowCount) {
+ ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowCount, data * 1000);
+ }
+
+ private void setDoubleColumnValue(Double data, ProjectedColumnInfo pci, int rowCount) {
+ ((NullableFloat8Vector.Mutator) pci.vv.getMutator())
+ .setSafe(rowCount, data);
+ }
+
+ private void initCols(Schema schema) throws SchemaChangeException {
+ ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
+
+ for (int i = 0; i < schema.getColumnCount(); i++) {
+
+ ColumnDTO column = schema.getColumnByIndex(i);
+ final String name = column.getColumnName();
+ final OpenTSDBTypes type = column.getColumnType();
+ TypeProtos.MinorType minorType = TYPES.get(type);
+
+ if (isMinorTypeNull(minorType)) {
+ String message = String.format(
+ "A column you queried has a data type that is not currently supported by the OpenTSDB storage plugin. "
+ + "The column's name was %s and its OpenTSDB data type was %s. ", name, type.toString());
+ throw UserException.unsupportedError()
+ .message(message)
+ .build(log);
+ }
+
+ ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType);
+ pciBuilder.add(pci);
+ }
+ projectedCols = pciBuilder.build();
+ }
+
+ private boolean isMinorTypeNull(MinorType minorType) {
+ return minorType == null;
+ }
+
+ private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, String name, MinorType minorType) throws SchemaChangeException {
+ MajorType majorType = getMajorType(minorType);
+
+ MaterializedField field =
+ MaterializedField.create(name, majorType);
+
+ ValueVector vector =
+ getValueVector(minorType, majorType, field);
+
+ return getProjectedColumnInfo(column, vector);
+ }
+
+ private MajorType getMajorType(MinorType minorType) {
+ MajorType majorType;
+ majorType = Types.optional(minorType);
+ return majorType;
+ }
+
+ private ValueVector getValueVector(MinorType minorType, MajorType majorType, MaterializedField field) throws SchemaChangeException {
+ final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
+ minorType, majorType.getMode());
+ ValueVector vector = output.addField(field, clazz);
+ vector.allocateNew();
+ return vector;
+ }
+
+ private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, ValueVector vector) {
+ ProjectedColumnInfo pci = new ProjectedColumnInfo();
+ pci.vv = vector;
+ pci.openTSDBColumn = column;
+ return pci;
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java
new file mode 100644
index 000000000..f93758de9
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OpenTSDBScanSpec {
+
+ private final String tableName;
+
+ @JsonCreator
+ public OpenTSDBScanSpec(@JsonProperty("tableName") String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String toString() {
+ return "OpenTSDBScanSpec{" +
+ "tableName='" + tableName + '\'' +
+ '}';
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
new file mode 100644
index 000000000..176dff061
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl;
+import org.apache.drill.exec.store.openTSDB.schema.OpenTSDBSchemaFactory;
+
+import java.io.IOException;
+
+public class OpenTSDBStoragePlugin extends AbstractStoragePlugin {
+
+ private final DrillbitContext context;
+
+ private final OpenTSDBStoragePluginConfig engineConfig;
+ private final OpenTSDBSchemaFactory schemaFactory;
+
+ private final ServiceImpl db;
+
+ public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name) throws IOException {
+ this.context = context;
+ this.schemaFactory = new OpenTSDBSchemaFactory(this, name);
+ this.engineConfig = configuration;
+ this.db = new ServiceImpl(configuration.getConnection());
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public OpenTSDBStoragePluginConfig getConfig() {
+ return engineConfig;
+ }
+
+ @Override
+ public OpenTSDBGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ OpenTSDBScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<OpenTSDBScanSpec>() {
+ });
+ return new OpenTSDBGroupScan(this, scanSpec, null);
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
+ }
+
+ public ServiceImpl getClient() {
+ return db;
+ }
+
+ DrillbitContext getContext() {
+ return this.context;
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
new file mode 100644
index 000000000..1b67c1ddf
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+@JsonTypeName(OpenTSDBStoragePluginConfig.NAME)
+public class OpenTSDBStoragePluginConfig extends StoragePluginConfigBase {
+
+ private static final Logger log = LoggerFactory.getLogger(OpenTSDBStoragePluginConfig.class);
+
+ public static final String NAME = "openTSDB";
+
+ private final String connection;
+
+ @JsonCreator
+ public OpenTSDBStoragePluginConfig(@JsonProperty("connection") String connection) throws IOException {
+ if (connection == null || connection.isEmpty()) {
+ throw UserException.validationError()
+ .message("Connection property must not be null. Check plugin configuration.")
+ .build(log);
+ }
+ this.connection = connection;
+ }
+
+ public String getConnection() {
+ return connection;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OpenTSDBStoragePluginConfig that = (OpenTSDBStoragePluginConfig) o;
+ return Objects.equals(connection, that.connection);
+ }
+
+ @Override
+ public int hashCode() {
+ return connection != null ? connection.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "OpenTSDBStoragePluginConfig{" +
+ "connection='" + connection + '\'' +
+ '}';
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
new file mode 100644
index 000000000..4e938049b
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+@JsonTypeName("openTSDB-sub-scan")
+public class OpenTSDBSubScan extends AbstractBase implements SubScan {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(OpenTSDBSubScan.class);
+
+ public final OpenTSDBStoragePluginConfig storage;
+
+ private final List<SchemaPath> columns;
+ private final OpenTSDBStoragePlugin openTSDBStoragePlugin;
+ private final List<OpenTSDBSubScanSpec> tabletScanSpecList;
+
+ @JsonCreator
+ public OpenTSDBSubScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("storage") OpenTSDBStoragePluginConfig storage,
+ @JsonProperty("tabletScanSpecList") LinkedList<OpenTSDBSubScanSpec> tabletScanSpecList,
+ @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+ super((String) null);
+ openTSDBStoragePlugin = (OpenTSDBStoragePlugin) registry.getPlugin(storage);
+ this.tabletScanSpecList = tabletScanSpecList;
+ this.storage = storage;
+ this.columns = columns;
+ }
+
+ public OpenTSDBSubScan(OpenTSDBStoragePlugin plugin, OpenTSDBStoragePluginConfig config,
+ List<OpenTSDBSubScanSpec> tabletInfoList, List<SchemaPath> columns) {
+ super((String) null);
+ openTSDBStoragePlugin = plugin;
+ storage = config;
+ this.tabletScanSpecList = tabletInfoList;
+ this.columns = columns;
+ }
+
+ @Override
+ public int getOperatorType() {
+ return 0;
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return false;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new OpenTSDBSubScan(openTSDBStoragePlugin, storage, tabletScanSpecList, columns);
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ public List<OpenTSDBSubScanSpec> getTabletScanSpecList() {
+ return tabletScanSpecList;
+ }
+
+ @JsonIgnore
+ public OpenTSDBStoragePlugin getStorageEngine() {
+ return openTSDBStoragePlugin;
+ }
+
+ @JsonProperty("storage")
+ public OpenTSDBStoragePluginConfig getStorageConfig() {
+ return storage;
+ }
+
+ public static class OpenTSDBSubScanSpec {
+
+ private final String tableName;
+
+ @JsonCreator
+ public OpenTSDBSubScanSpec(@JsonProperty("tableName") String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java
new file mode 100644
index 000000000..6e0ef05f7
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.base.Splitter;
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class Util {
+
+ private static final Logger log = LoggerFactory.getLogger(Util.class);
+
+ /**
+ * Parse FROM parameters to Map representation
+ *
+ * @param rowData with this syntax (metric=warp.speed.test)
+ * @return Map with params key: metric, value: warp.speed.test
+ */
+ public static Map<String, String> fromRowData(String rowData) {
+ try {
+ String fromRowData = rowData.replaceAll("[()]", "");
+ return Splitter.on(",").trimResults().omitEmptyStrings().withKeyValueSeparator("=").split(fromRowData);
+ } catch (IllegalArgumentException e) {
+ throw UserException.validationError()
+ .message(String.format("Syntax error in the query %s", rowData))
+ .build(log);
+ }
+ }
+
+ /**
+ * @param name Metric name
+ * @return Valid metric name
+ */
+ public static String getValidTableName(String name) {
+ if (!isTableNameValid(name)) {
+ name = fromRowData(name).get("metric");
+ }
+ return name;
+ }
+
+ /**
+ * @param name Metric name
+ * @return true if name is valid
+ */
+ public static boolean isTableNameValid(String name) {
+ return !name.contains("=");
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java
new file mode 100644
index 000000000..1d561c2f0
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+import org.apache.drill.exec.store.openTSDB.client.query.DBQuery;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.GET;
+import retrofit2.http.POST;
+
+import java.util.Set;
+
+/**
+ * Client for API requests to openTSDB
+ */
+public interface OpenTSDB {
+
+ /**
+ * Used for getting all metrics names from openTSDB
+ *
+ * @return Set<String> with all tables names
+ */
+ @GET("api/suggest?type=metrics&max=" + Integer.MAX_VALUE)
+ Call<Set<String>> getAllTablesName();
+
+ /**
+ * Overloaded getTables for POST request to DB
+ *
+ * @param query Query for for selecting data
+ * @return Set<Table> with metrics from openTSDB
+ */
+ @POST("api/query")
+ Call<Set<MetricDTO>> getTables(@Body DBQuery query);
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java
new file mode 100644
index 000000000..2a6b80296
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+/**
+ * Types in openTSDB records,
+ * used for converting openTSDB data to Sql representation
+ */
+public enum OpenTSDBTypes {
+ STRING,
+ DOUBLE,
+ TIMESTAMP
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java
new file mode 100644
index 000000000..2c8dc9f1a
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.store.openTSDB.Constants.AGGREGATOR_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.DEFAULT_TIME;
+import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.SUM_AGGREGATOR;
+import static org.apache.drill.exec.store.openTSDB.Constants.TIME_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Util.getValidTableName;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.AGGREGATED_VALUE;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.AGGREGATE_TAGS;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.METRIC;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.TIMESTAMP;
+
+/**
+ * Abstraction for representing structure of openTSDB table
+ */
+public class Schema {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(Schema.class);
+
+ private final List<ColumnDTO> columns = new ArrayList<>();
+ private final Service db;
+ private final String name;
+
+ public Schema(Service db, String name) {
+ this.db = db;
+ this.name = name;
+ setupStructure();
+ }
+
+ private void setupStructure() {
+ columns.add(new ColumnDTO(METRIC.toString(), OpenTSDBTypes.STRING));
+ columns.add(new ColumnDTO(AGGREGATE_TAGS.toString(), OpenTSDBTypes.STRING));
+ columns.add(new ColumnDTO(TIMESTAMP.toString(), OpenTSDBTypes.TIMESTAMP));
+ columns.add(new ColumnDTO(AGGREGATED_VALUE.toString(), OpenTSDBTypes.DOUBLE));
+ columns.addAll(db.getUnfixedColumns(getParamsForQuery()));
+ }
+
+ /**
+ * Return list with all columns names and its types
+ *
+ * @return List<ColumnDTO>
+ */
+ public List<ColumnDTO> getColumns() {
+ return Collections.unmodifiableList(columns);
+ }
+
+ /**
+ * Number of columns in table
+ *
+ * @return number of table columns
+ */
+ public int getColumnCount() {
+ return columns.size();
+ }
+
+ /**
+ * @param columnIndex index of required column in table
+ * @return ColumnDTO
+ */
+ public ColumnDTO getColumnByIndex(int columnIndex) {
+ return columns.get(columnIndex);
+ }
+
+ // Create map with required params, for querying metrics.
+ // Without this params, we cannot make API request to db.
+ private Map<String, String> getParamsForQuery() {
+ HashMap<String, String> params = new HashMap<>();
+ params.put(METRIC_PARAM, getValidTableName(name));
+ params.put(AGGREGATOR_PARAM, SUM_AGGREGATOR);
+ params.put(TIME_PARAM, DEFAULT_TIME);
+ return params;
+ }
+
+ /**
+ * Structure with constant openTSDB columns
+ */
+ enum DefaultColumns {
+
+ METRIC("metric"),
+ TIMESTAMP("timestamp"),
+ AGGREGATE_TAGS("aggregate tags"),
+ AGGREGATED_VALUE("aggregated value");
+
+ private String columnName;
+
+ DefaultColumns(String name) {
+ this.columnName = name;
+ }
+
+ @Override
+ public String toString() {
+ return columnName;
+ }
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java
new file mode 100644
index 000000000..0be739449
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface Service {
+ /**
+ *
+ * Used for getting all Metrics from openTSDB.
+ * Must be present required params: metric, start, aggregator
+ *
+ * @param queryParam parameters for the API request
+ * @return Set<MetricDTO> all metrics
+ */
+ Set<MetricDTO> getAllMetrics(Map<String, String> queryParam);
+
+ /**
+ *
+ * Used for getting all metrics names from openTSDB
+ *
+ * @return Set<String> metric names
+ */
+ Set<String> getAllMetricNames();
+
+ /**
+ *
+ * Used for getting all non fixed columns based on tags from openTSDB
+ * Must be present required params: metric, start, aggregator
+ *
+ * @param queryParam parameters for the API request
+ * @return List<ColumnDTO> columns based on tags
+ */
+ List<ColumnDTO> getUnfixedColumns(Map<String, String> queryParam);
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java
new file mode 100644
index 000000000..e79d0ceaf
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client.query;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * DBQuery is an abstraction of an openTSDB query,
+ * that used for extracting data from the storage system by POST request to DB.
+ * <p>
+ * An OpenTSDB query requires at least one sub query,
+ * a means of selecting which time series should be included in the result set.
+ */
+public class DBQuery {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(DBQuery.class);
+ /**
+ * The start time for the query. This can be a relative or absolute timestamp.
+ */
+ private String start;
+ /**
+ * An end time for the query. If not supplied, the TSD will assume the local system time on the server.
+ * This may be a relative or absolute timestamp. This param is optional, and if it isn't specified we will send null
+ * to the db in this field, but in this case db will assume the local system time on the server.
+ */
+ private String end;
+ /**
+ * One or more sub subQueries used to select the time series to return.
+ */
+ private Set<Query> queries;
+
+ private DBQuery(Builder builder) {
+ this.start = builder.start;
+ this.end = builder.end;
+ this.queries = builder.queries;
+ }
+
+ public String getStart() {
+ return start;
+ }
+
+ public String getEnd() {
+ return end;
+ }
+
+ public Set<Query> getQueries() {
+ return queries;
+ }
+
+ public static class Builder {
+
+ private String start;
+ private String end;
+ private Set<Query> queries = new HashSet<>();
+
+ public Builder() {
+ }
+
+ public Builder setStartTime(String startTime) {
+ if (startTime == null) {
+ throw UserException.validationError()
+ .message("start param must be specified")
+ .build(log);
+ }
+ this.start = startTime;
+ return this;
+ }
+
+ public Builder setEndTime(String endTime) {
+ this.end = endTime;
+ return this;
+ }
+
+ public Builder setQueries(Set<Query> queries) {
+ if (queries.isEmpty()) {
+ throw UserException.validationError()
+ .message("Required params such as metric, aggregator weren't specified. " +
+ "Add these params to the query")
+ .build(log);
+ }
+ this.queries = queries;
+ return this;
+ }
+
+ public DBQuery build() {
+ return new DBQuery(this);
+ }
+
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DBQuery dbQuery = (DBQuery) o;
+
+ if (!start.equals(dbQuery.start)) {
+ return false;
+ }
+ if (!end.equals(dbQuery.end)) {
+ return false;
+ }
+ return queries.equals(dbQuery.queries);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = start.hashCode();
+ result = 31 * result + end.hashCode();
+ result = 31 * result + queries.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DBQuery{" +
+ "start='" + start + '\'' +
+ ", end='" + end + '\'' +
+ ", queries=" + queries +
+ '}';
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java
new file mode 100644
index 000000000..bdcd1c462
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client.query;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Query is an abstraction of openTSDB subQuery
+ * and it is integral part of DBQuery
+ * <p>
+ * Each sub query can retrieve individual or groups of timeseries data,
+ * performing aggregation on each set.
+ */
+public class Query {
+ private static final Logger log =
+ LoggerFactory.getLogger(Query.class);
+ /**
+ * The name of an aggregation function to use.
+ */
+ private String aggregator;
+ /**
+ * The name of a metric stored in the system
+ */
+ private String metric;
+ /**
+ * Whether or not the data should be converted into deltas before returning.
+ * This is useful if the metric is a continuously incrementing counter
+ * and you want to view the rate of change between data points.
+ */
+ private String rate;
+ /**
+ * An optional downsampling function to reduce the amount of data returned.
+ */
+ private String downsample;
+ /**
+ * To drill down to specific timeseries or group results by tag,
+ * supply one or more map values in the same format as the query string.
+ */
+ private Map<String, String> tags;
+
+ private Query(Builder builder) {
+ this.aggregator = builder.aggregator;
+ this.metric = builder.metric;
+ this.rate = builder.rate;
+ this.downsample = builder.downsample;
+ this.tags = builder.tags;
+ }
+
+ public String getAggregator() {
+ return aggregator;
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public String getRate() {
+ return rate;
+ }
+
+ public String getDownsample() {
+ return downsample;
+ }
+
+ public Map<String, String> getTags() {
+ return tags;
+ }
+
+ public static class Builder {
+
+ private String aggregator;
+ private String metric;
+ private String rate;
+ private String downsample;
+ private Map<String, String> tags = new HashMap<>();
+
+ public Builder(String metric) {
+ this.metric = metric;
+ }
+
+ public Builder setAggregator(String aggregator) {
+ if (aggregator == null) {
+ throw UserException.validationError()
+ .message("aggregator param must be specified")
+ .build(log);
+ }
+ this.aggregator = aggregator;
+ return this;
+ }
+
+ public Builder setMetric(String metric) {
+ if (metric == null) {
+ throw UserException.validationError()
+ .message("metric param must be specified")
+ .build(log);
+ }
+ this.metric = metric;
+ return this;
+ }
+
+ public Builder setRate(String rate) {
+ this.rate = rate;
+ return this;
+ }
+
+ public Builder setDownsample(String downsample) {
+ this.downsample = downsample;
+ return this;
+ }
+
+ public Builder setTags(Map<String, String> tags) {
+ this.tags = tags;
+ return this;
+ }
+
+ public Query build() {
+ return new Query(this);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Query subQuery = (Query) o;
+
+ if (aggregator != null ? !aggregator.equals(subQuery.aggregator) : subQuery.aggregator != null) {
+ return false;
+ }
+ if (metric != null ? !metric.equals(subQuery.metric) : subQuery.metric != null) {
+ return false;
+ }
+ if (rate != null ? !rate.equals(subQuery.rate) : subQuery.rate != null) {
+ return false;
+ }
+ if (downsample != null ? !downsample.equals(subQuery.downsample) : subQuery.downsample != null) {
+ return false;
+ }
+ return tags != null ? tags.equals(subQuery.tags) : subQuery.tags == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = aggregator != null ? aggregator.hashCode() : 0;
+ result = 31 * result + (metric != null ? metric.hashCode() : 0);
+ result = 31 * result + (rate != null ? rate.hashCode() : 0);
+ result = 31 * result + (downsample != null ? downsample.hashCode() : 0);
+ result = 31 * result + (tags != null ? tags.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "SubQuery{" +
+ "aggregator='" + aggregator + '\'' +
+ ", metric='" + metric + '\'' +
+ ", rate='" + rate + '\'' +
+ ", downsample='" + downsample + '\'' +
+ ", tags=" + tags +
+ '}';
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
new file mode 100644
index 000000000..41730bd17
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client.services;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDB;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+import org.apache.drill.exec.store.openTSDB.client.Service;
+import org.apache.drill.exec.store.openTSDB.client.query.DBQuery;
+import org.apache.drill.exec.store.openTSDB.client.query.Query;
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import retrofit2.Retrofit;
+import retrofit2.converter.jackson.JacksonConverterFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.openTSDB.Constants.AGGREGATOR_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.DOWNSAMPLE_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.END_TIME_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.TIME_PARAM;
+
+public class ServiceImpl implements Service {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(ServiceImpl.class);
+
+ private final OpenTSDB client;
+
+ public ServiceImpl(String connectionURL) {
+ this.client = new Retrofit.Builder()
+ .baseUrl(connectionURL)
+ .addConverterFactory(JacksonConverterFactory.create())
+ .build()
+ .create(OpenTSDB.class);
+ }
+
+ @Override
+ public Set<MetricDTO> getAllMetrics(Map<String, String> queryParams) {
+ return getAllMetricsByTags(queryParams);
+ }
+
+ @Override
+ public Set<String> getAllMetricNames() {
+ return getTableNames();
+ }
+
+ @Override
+ public List<ColumnDTO> getUnfixedColumns(Map<String, String> queryParam) {
+ Set<MetricDTO> metrics = getAllMetricsByTags(queryParam);
+ List<ColumnDTO> unfixedColumns = new ArrayList<>();
+
+ for (MetricDTO metric : metrics) {
+ for (String tag : metric.getTags().keySet()) {
+ ColumnDTO tmp = new ColumnDTO(tag, OpenTSDBTypes.STRING);
+ if (!unfixedColumns.contains(tmp)) {
+ unfixedColumns.add(tmp);
+ }
+ }
+ }
+ return unfixedColumns;
+ }
+
+ private Set<MetricDTO> getAllMetricsByTags(Map<String, String> queryParams) {
+ try {
+ return getAllMetricsFromDBByTags(queryParams);
+ } catch (IOException e) {
+ throw UserException.connectionError(e)
+ .message("Cannot connect to the db. " +
+ "Maybe you have incorrect connection params or db unavailable now")
+ .build(log);
+ }
+ }
+
+ private Set<String> getTableNames() {
+ try {
+ return client.getAllTablesName().execute().body();
+ } catch (IOException e) {
+ throw UserException.connectionError(e)
+ .message("Cannot connect to the db. " +
+ "Maybe you have incorrect connection params or db unavailable now")
+ .build(log);
+ }
+ }
+
+ private Set<MetricDTO> getMetricsByTags(DBQuery base) throws IOException {
+ return client.getTables(base).execute().body();
+ }
+
+ private Set<MetricDTO> getAllMetricsFromDBByTags(Map<String, String> queryParams) throws IOException {
+ Map<String, String> tags = new HashMap<>();
+ DBQuery baseQuery = getConfiguredDbQuery(tags, queryParams);
+
+ Set<MetricDTO> metrics = getBaseMetric(baseQuery);
+ if (metrics == null || metrics.isEmpty()) {
+ throw UserException.validationError()
+ .message(String.format("Table '%s' not found. Please check your query and params", queryParams.get(METRIC_PARAM)))
+ .build(log);
+ }
+ Set<String> extractedTags = getTagsFromMetrics(metrics);
+
+ return getMetricsByTags(extractedTags, queryParams);
+ }
+
+ private Set<MetricDTO> getMetricsByTags(Set<String> extractedTags, Map<String, String> queryParams) throws IOException {
+ Set<MetricDTO> metrics = new HashSet<>();
+ for (String value : extractedTags) {
+ metrics.addAll(getMetricsByTags(getConfiguredDbQuery(getTransformedTag(value), queryParams)));
+ }
+ return metrics;
+ }
+
+ private DBQuery getConfiguredDbQuery(Map<String, String> tags, Map<String, String> queryParams) {
+ Query subQuery = new Query.Builder(queryParams.get(METRIC_PARAM))
+ .setAggregator(queryParams.get(AGGREGATOR_PARAM))
+ .setDownsample(queryParams.get(DOWNSAMPLE_PARAM))
+ .setTags(tags).build();
+
+ Set<Query> queries = new HashSet<>();
+ queries.add(subQuery);
+
+ return new DBQuery.Builder()
+ .setStartTime(queryParams.get(TIME_PARAM))
+ .setEndTime(queryParams.get(END_TIME_PARAM))
+ .setQueries(queries)
+ .build();
+ }
+
+ private Set<MetricDTO> getBaseMetric(DBQuery base) throws IOException {
+ return getMetricsByTags(base);
+ }
+
+ private Set<String> getTagsFromMetrics(Set<MetricDTO> metrics) {
+ Set<String> extractedTags = new HashSet<>();
+
+ for (MetricDTO table : metrics) {
+ extractedTags.addAll(table.getAggregateTags());
+ extractedTags.addAll(table.getTags().keySet());
+ }
+
+ return extractedTags;
+ }
+
+ private Map<String, String> getTransformedTag(String tag) {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(tag, "*");
+ return tags;
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java
new file mode 100644
index 000000000..03c595261
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.dto;
+
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+
+import java.util.Objects;
+
+public class ColumnDTO {
+
+ private final String columnName;
+ private final OpenTSDBTypes columnType;
+
+ public ColumnDTO(String columnName, OpenTSDBTypes columnType) {
+ this.columnName = columnName;
+ this.columnType = columnType;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public OpenTSDBTypes getColumnType() {
+ return columnType;
+ }
+
+ public boolean isNullable() {
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ColumnDTO columnDTO = (ColumnDTO) o;
+ return Objects.equals(columnName, columnDTO.columnName) &&
+ columnType == columnDTO.columnType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnName, columnType);
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java
new file mode 100644
index 000000000..7e6285f3a
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.dto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class MetricDTO {
+
+ private String metric;
+ private Map<String, String> tags;
+ private List<String> aggregateTags;
+ private Map<String, String> dps;
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public Map<String, String> getTags() {
+ return tags;
+ }
+
+ public List<String> getAggregateTags() {
+ return aggregateTags;
+ }
+
+ public Map<String, String> getDps() {
+ return dps;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MetricDTO metricDTO = (MetricDTO) o;
+ return Objects.equals(metric, metricDTO.metric) &&
+ Objects.equals(tags, metricDTO.tags) &&
+ Objects.equals(aggregateTags, metricDTO.aggregateTags) &&
+ Objects.equals(dps, metricDTO.dps);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metric, tags, aggregateTags, dps);
+ }
+
+ @Override
+ public String toString() {
+ return "Table{" +
+ "metric='" + metric + '\'' +
+ ", tags=" + tags +
+ ", aggregateTags=" + aggregateTags +
+ ", dps=" + dps +
+ '}';
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
new file mode 100644
index 000000000..cca39d8c8
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.openTSDB.DrillOpenTSDBTable;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBScanSpec;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePlugin;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig;
+import org.apache.drill.exec.store.openTSDB.client.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+public class OpenTSDBSchemaFactory implements SchemaFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(OpenTSDBSchemaFactory.class);
+
+ private final String schemaName;
+ private final OpenTSDBStoragePlugin plugin;
+
+ public OpenTSDBSchemaFactory(OpenTSDBStoragePlugin plugin, String schemaName) {
+ this.plugin = plugin;
+ this.schemaName = schemaName;
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ OpenTSDBSchema schema = new OpenTSDBSchema(schemaName);
+ parent.add(schemaName, schema);
+ }
+
+ class OpenTSDBSchema extends AbstractSchema {
+
+ OpenTSDBSchema(String name) {
+ super(Collections.<String>emptyList(), name);
+ }
+
+ @Override
+ public Table getTable(String name) {
+ OpenTSDBScanSpec scanSpec = new OpenTSDBScanSpec(name);
+ return new DrillOpenTSDBTable(schemaName, plugin, new Schema(plugin.getClient(), name), scanSpec);
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return plugin.getClient().getAllMetricNames();
+ }
+
+ @Override
+ public String getTypeName() {
+ return OpenTSDBStoragePluginConfig.NAME;
+ }
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-opentsdb/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 000000000..d1055c12b
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+ "storage": {
+ openTSDB: {
+ type: "openTSDB",
+ connection: "http://localhost:10000",
+ enabled: false
+ }
+ }
+}
diff --git a/contrib/storage-opentsdb/src/main/resources/drill-module.conf b/contrib/storage-opentsdb/src/main/resources/drill-module.conf
new file mode 100644
index 000000000..d5743da7b
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/resources/drill-module.conf
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.openTSDB"
+}
diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestDataHolder.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestDataHolder.java
new file mode 100644
index 000000000..c6e722845
--- /dev/null
+++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestDataHolder.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.store.openTSDB;
+
+public class TestDataHolder {
+
+ public static final String SAMPLE_DATA_FOR_POST_REQUEST_WITH_TAGS = "[{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{\"symbol\":\"VOD.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1407165399\":196.3000030517578," +
+ "\"1407165402\":196.3000030517578," +
+ "\"1407165405\":196.3000030517578," +
+ "\"1407165407\":196.3000030517578," +
+ "\"1407165410\":196.3000030517578," +
+ "\"1407165422\":196.3000030517578," +
+ "\"1488271956\":111.11000061035156}}," +
+ "{\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{\"symbol\":\"BP.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1407165399\":484.20001220703125," +
+ "\"1407165403\":484.1499938964844," +
+ "\"1407165405\":484.1499938964844," +
+ "\"1407165408\":484.1499938964844," +
+ "\"1407165419\":484.1499938964844," +
+ "\"1407165423\":484.2550048828125}}," +
+ "{\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{\"symbol\":\"BARC.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1407165401\":224.14999389648438," +
+ "\"1407165404\":224.14999389648438," +
+ "\"1407165406\":224.14999389648438," +
+ "\"1407165409\":224.14999389648438," +
+ "\"1407165422\":224.14999389648438}" +
+ "}]";
+
+ public static final String SAMPLE_DATA_FOR_GET_TABLE_REQUEST =
+ "[{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{}," +
+ "\"aggregateTags\":[\"symbol\"]," +
+ "\"dps\":{" +
+ "\"1407165399\":680.5000152587891," +
+ "\"1407165401\":904.625," +
+ "\"1407165402\":904.6124954223633," +
+ "\"1407165403\":904.5999908447266," +
+ "\"1407165404\":904.5999908447266," +
+ "\"1407165405\":904.5999908447266," +
+ "\"1407165406\":904.5999908447266," +
+ "\"1407165407\":904.5999908447266," +
+ "\"1407165408\":904.5999908447266," +
+ "\"1407165409\":904.5999908447266," +
+ "\"1407165410\":904.5999908447266," +
+ "\"1407165419\":904.5999908447266," +
+ "\"1407165422\":904.6787490844727," +
+ "\"1407165423\":680.5550068842233," +
+ "\"1488271956\":111.11000061035156}" +
+ "}]";
+
+ public static final String SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITH_TAGS =
+ "[{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{\"symbol\":\"VOD.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1261440000\":196.3000030517578," +
+ "\"1419120000\":111.11000061035156}" +
+ "},{" +
+ "\"metric\":\"warp.speed.test\"" +
+ ",\"tags\":{\"symbol\":\"BP.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1261440000\":484.1758321126302}" +
+ "},{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{" +
+ "\"symbol\":\"BARC.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1261440000\":224.14999389648438}" +
+ "}]";
+
+ public static final String SAMPLE_DATA_FOR_GET_TABLE_NAME_REQUEST = "[\"warp.speed.test\"]";
+
+ public static final String SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITHOUT_TAGS =
+ "[{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{}," +
+ "\"aggregateTags\":[" +
+ "\"symbol\"]," +
+ "\"dps\":{" +
+ "\"1261440000\":904.6258290608723," +
+ "\"1419120000\":111.11000061035156}" +
+ "}]";
+
+ public static final String SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS =
+ "[{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{}," +
+ "\"aggregateTags\":[" +
+ "\"symbol\"]," +
+ "\"dps\":{" +
+ "\"1407165399\":680.5000152587891," +
+ "\"1407165401\":904.625," +
+ "\"1407165402\":904.6124954223633," +
+ "\"1419120000\":904.5999908447266}" +
+ "}]";
+
+ public static final String DOWNSAMPLE_REQUEST_WTIHOUT_TAGS =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":null," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"rate\":null," +
+ "\"downsample\":\"5y-avg\"," +
+ "\"tags\":{}" +
+ "}]" +
+ "}";
+
+ public static final String END_PARAM_REQUEST_WTIHOUT_TAGS =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":\"1407165403000\"," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"rate\":null," +
+ "\"downsample\":null," +
+ "\"tags\":{}" +
+ "}]" +
+ "}";
+
+
+ public static final String DOWNSAMPLE_REQUEST_WITH_TAGS =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":null," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"rate\":null," +
+ "\"downsample\":\"5y-avg\"," +
+ "\"tags\":{" +
+ "\"symbol\":\"*\"}" +
+ "}]" +
+ "}";
+
+ public static final String END_PARAM_REQUEST_WITH_TAGS =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":\"1407165403000\"," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"rate\":null," +
+ "\"downsample\":null," +
+ "\"tags\":{" +
+ "\"symbol\":\"*\"}" +
+ "}]" +
+ "}";
+
+ public static final String REQUEST_TO_NONEXISTENT_METRIC =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":null," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.spee\"," +
+ "\"rate\":null," +
+ "\"downsample\":null," +
+ "\"tags\":{" + "}" +
+ "}]" +
+ "}";
+
+
+ public static final String POST_REQUEST_WITHOUT_TAGS =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":null," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"rate\":null," +
+ "\"downsample\":null," +
+ "\"tags\":{}" +
+ "}]" +
+ "}";
+
+
+ public static final String POST_REQUEST_WITH_TAGS =
+ "{" +
+ "\"start\":\"47y-ago\"," +
+ "\"end\":null," +
+ "\"queries\":[{" +
+ "\"aggregator\":\"sum\"," +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"rate\":null," +
+ "\"downsample\":null," +
+ "\"tags\":{" +
+ "\"symbol\":\"*\"}" +
+ "}]" +
+ "}";
+
+ public static final String SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS =
+ "[{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{\"symbol\":\"VOD.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1407165399\":196.3000030517578," +
+ "\"1407165402\":196.3000030517578}" +
+ "},{" +
+ "\"metric\":\"warp.speed.test\"" +
+ ",\"tags\":{\"symbol\":\"BP.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1407165399\":484.20001220703125," +
+ "\"1407165403\":484.1499938964844}" +
+ "},{" +
+ "\"metric\":\"warp.speed.test\"," +
+ "\"tags\":{" +
+ "\"symbol\":\"BARC.L\"}," +
+ "\"aggregateTags\":[]," +
+ "\"dps\":{" +
+ "\"1407165401\":224.14999389648438}" +
+ "}]";
+}
diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
new file mode 100644
index 000000000..2d6c50636
--- /dev/null
+++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.store.openTSDB;
+
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePlugin;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.apache.drill.store.openTSDB.TestDataHolder.DOWNSAMPLE_REQUEST_WITH_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.DOWNSAMPLE_REQUEST_WTIHOUT_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.END_PARAM_REQUEST_WITH_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.END_PARAM_REQUEST_WTIHOUT_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.POST_REQUEST_WITHOUT_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.POST_REQUEST_WITH_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.REQUEST_TO_NONEXISTENT_METRIC;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_GET_TABLE_NAME_REQUEST;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_GET_TABLE_REQUEST;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITHOUT_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITH_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS;
+import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_REQUEST_WITH_TAGS;
+
+public class TestOpenTSDBPlugin extends PlanTestBase {
+
+ protected static OpenTSDBStoragePlugin storagePlugin;
+ protected static OpenTSDBStoragePluginConfig storagePluginConfig;
+
+ @Rule
+ public WireMockRule wireMockRule = new WireMockRule(10000);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ storagePlugin = (OpenTSDBStoragePlugin) pluginRegistry.getPlugin(OpenTSDBStoragePluginConfig.NAME);
+ storagePluginConfig = storagePlugin.getConfig();
+ storagePluginConfig.setEnabled(true);
+ pluginRegistry.createOrUpdate(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig, true);
+ }
+
+ @Before
+ public void init() {
+ setupPostStubs();
+ setupGetStubs();
+ }
+
+ private void setupGetStubs() {
+ wireMockRule.stubFor(get(urlEqualTo("/api/suggest?type=metrics&max=" + Integer.MAX_VALUE))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_GET_TABLE_NAME_REQUEST)));
+
+ wireMockRule.stubFor(get(urlEqualTo("/api/query?start=47y-ago&m=sum:warp.speed.test"))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withBody(SAMPLE_DATA_FOR_GET_TABLE_REQUEST)
+ ));
+ }
+
+ private void setupPostStubs() {
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(POST_REQUEST_WITHOUT_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_GET_TABLE_REQUEST)));
+
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(POST_REQUEST_WITH_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_REQUEST_WITH_TAGS)));
+
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(DOWNSAMPLE_REQUEST_WTIHOUT_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITHOUT_TAGS)));
+
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS)));
+
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(DOWNSAMPLE_REQUEST_WITH_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITH_TAGS)));
+
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS)));
+
+ wireMockRule.stubFor(post(urlEqualTo("/api/query"))
+ .withRequestBody(equalToJson(REQUEST_TO_NONEXISTENT_METRIC))
+ .willReturn(aResponse()
+ .withStatus(400)
+ .withHeader("Content-Type", "application/json")
+ ));
+ }
+
+ @Test
+ public void testBasicQueryFromWithRequiredParams() throws Exception {
+ String query =
+ "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
+ Assert.assertEquals(18, testSql(query));
+ }
+
+ @Test
+ public void testBasicQueryGroupBy() throws Exception {
+ String query =
+ "select `timestamp`, sum(`aggregated value`) from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago)` group by `timestamp`";
+ Assert.assertEquals(15, testSql(query));
+ }
+
+ @Test
+ public void testBasicQueryFromWithInterpolationParam() throws Exception {
+ String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, downsample=5y-avg)`";
+ Assert.assertEquals(4, testSql(query));
+ }
+
+ @Test
+ public void testBasicQueryFromWithEndParam() throws Exception {
+ String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, end=1407165403000))`";
+ Assert.assertEquals(5, testSql(query));
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testBasicQueryWithoutTableName() throws Exception {
+ test("select * from openTSDB.``;");
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testBasicQueryWithNonExistentTableName() throws Exception {
+ test("select * from openTSDB.`warp.spee`");
+ }
+
+ @Test
+ public void testPhysicalPlanExecutionBasedOnQuery() throws Exception {
+ String query = "EXPLAIN PLAN for select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
+ String plan = getPlanInString(query, JSON_FORMAT);
+ Assert.assertEquals(18, testPhysical(plan));
+ }
+
+ @Test
+ public void testDescribe() throws Exception {
+ test("use openTSDB");
+ test("describe `warp.speed.test`");
+ Assert.assertEquals(1, testSql("show tables"));
+ }
+}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 86c3d111c..9bb21d6bb 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -223,6 +223,11 @@
</dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-opentsdb-storage</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-mongo-storage</artifactId>
<version>${project.version}</version>
</dependency>