aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-jdbc/src
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2015-08-01 18:11:51 -0700
committerJacques Nadeau <jacques@apache.org>2015-09-13 18:26:44 -0700
commite12cd470e4ab57b025840fdfa200a051a01df029 (patch)
tree09563622a7a4ccfdd21a6dc29a8927b7c28e3106 /contrib/storage-jdbc/src
parent8478e9fb1d7e6881d8f092ae9ff3a338f2e023a6 (diff)
DRILL-3180: JDBC Storage Plugin updates.
- Move to leverage Calcite's JDBC adapter capabilities for pushdowns, schema, etc. - Add test cases using Derby
Diffstat (limited to 'contrib/storage-jdbc/src')
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java119
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java42
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java51
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java119
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java107
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java79
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java120
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java431
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java120
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java400
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java71
-rwxr-xr-xcontrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json10
-rwxr-xr-xcontrib/storage-jdbc/src/main/resources/drill-module.conf18
-rw-r--r--contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java181
-rwxr-xr-xcontrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json10
-rw-r--r--contrib/storage-jdbc/src/test/resources/logback.xml48
16 files changed, 1926 insertions, 0 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
new file mode 100644
index 000000000..bbb4daf6c
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.adapter.jdbc.JdbcConvention;
+import org.apache.calcite.adapter.jdbc.JdbcRules;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+abstract class DrillJdbcRuleBase extends ConverterRule {
+
+ protected final LoadingCache<RexNode, Boolean> checkedExpressions = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build(
+ new CacheLoader<RexNode, Boolean>() {
+ public Boolean load(RexNode expr) {
+ return JdbcExpressionCheck.isOnlyStandardExpressions(expr);
+ }
+ });
+
+ protected final JdbcConvention out;
+
+ private DrillJdbcRuleBase(Class<? extends RelNode> clazz, RelTrait in, JdbcConvention out, String description) {
+ super(clazz, in, out, description);
+ this.out = out;
+ }
+
+ static class DrillJdbcProjectRule extends DrillJdbcRuleBase {
+
+ public DrillJdbcProjectRule(JdbcConvention out) {
+ super(LogicalProject.class, Convention.NONE, out, "JdbcProjectRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ LogicalProject project = (LogicalProject) rel;
+ return new JdbcRules.JdbcProject(rel.getCluster(), rel.getTraitSet().replace(this.out), convert(
+ project.getInput(), project.getInput().getTraitSet().replace(this.out)), project.getProjects(),
+ project.getRowType());
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ try {
+
+ final LogicalProject project = (LogicalProject) call.rel(0);
+ for (RexNode node : project.getChildExps()) {
+ if (!checkedExpressions.get(node)) {
+ return false;
+ }
+ }
+ return true;
+
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Failure while trying to evaluate pushdown.", e);
+ }
+ }
+ }
+
+ static class DrillJdbcFilterRule extends DrillJdbcRuleBase {
+
+ public DrillJdbcFilterRule(JdbcConvention out) {
+ super(LogicalFilter.class, Convention.NONE, out, "DrillJdbcFilterRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ LogicalFilter filter = (LogicalFilter) rel;
+
+ return new JdbcRules.JdbcFilter(rel.getCluster(), rel.getTraitSet().replace(this.out), convert(filter.getInput(),
+ filter.getInput().getTraitSet().replace(this.out)), filter.getCondition());
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ try {
+
+ final LogicalFilter filter = (LogicalFilter) call.rel(0);
+ for (RexNode node : filter.getChildExps()) {
+ if (!checkedExpressions.get(node)) {
+ return false;
+ }
+ }
+ return true;
+
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Failure while trying to evaluate pushdown.", e);
+ }
+ }
+ }
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
new file mode 100755
index 000000000..fa44b5505
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+//import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+
+public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
+ @Override
+ public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ JdbcStoragePlugin plugin = config.getPlugin();
+ RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
+ return new ScanBatch(config, context, Collections.singletonList(reader).iterator());
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
new file mode 100644
index 000000000..52dd29f77
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.logical.DrillImplementor;
+import org.apache.drill.exec.planner.logical.DrillRel;
+
+public class JdbcDrel extends SingleRel implements DrillRel {
+
+ public JdbcDrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+ super(cluster, traits, child);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new JdbcDrel(getCluster(), traitSet, inputs.iterator().next());
+ }
+
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return copy(getTraitSet(), getInputs());
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
new file mode 100644
index 000000000..2015a7754
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+
+/**
+ * Visitor class that determines whether or not a particular RexNode expression tree contains only standard expressions.
+ * If RexNode tree contains Drill specific expressions, the tree will respond with false.
+ */
+class JdbcExpressionCheck implements RexVisitor<Boolean> {
+
+ private static final JdbcExpressionCheck INSTANCE = new JdbcExpressionCheck();
+
+ public static boolean isOnlyStandardExpressions(RexNode rex) {
+ return rex.accept(INSTANCE);
+ }
+
+ @Override
+ public Boolean visitInputRef(RexInputRef paramRexInputRef) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitLocalRef(RexLocalRef paramRexLocalRef) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitLiteral(RexLiteral paramRexLiteral) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitCall(RexCall paramRexCall) {
+ if(paramRexCall.getOperator() instanceof DrillSqlOperator){
+ return false;
+ }else{
+ for (RexNode operand : paramRexCall.operands) {
+ if (!operand.accept(this)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public Boolean visitOver(RexOver over) {
+ if (!visitCall(over)) {
+ return false;
+ }
+ ;
+
+ final RexWindow window = over.getWindow();
+ for (RexFieldCollation orderKey : window.orderKeys) {
+ if (!((RexNode) orderKey.left).accept(this)) {
+ return false;
+ }
+ }
+
+ for (RexNode partitionKey : window.partitionKeys) {
+ if (!partitionKey.accept(this)) {
+ return false;
+ }
+ }
+
+ return true;
+
+ }
+
+ @Override
+ public Boolean visitCorrelVariable(RexCorrelVariable paramRexCorrelVariable) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitDynamicParam(RexDynamicParam paramRexDynamicParam) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitRangeRef(RexRangeRef paramRexRangeRef) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitFieldAccess(RexFieldAccess paramRexFieldAccess) {
+ return paramRexFieldAccess.getReferenceExpr().accept(this);
+ }
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
new file mode 100644
index 000000000..95b03cf50
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("jdbc-scan")
+public class JdbcGroupScan extends AbstractGroupScan {
+
+ private final String sql;
+ private final JdbcStoragePlugin plugin;
+ private final double rows;
+
+ @JsonCreator
+ public JdbcGroupScan(
+ @JsonProperty("sql") String sql,
+ @JsonProperty("config") StoragePluginConfig config,
+ @JsonProperty("rows") double rows,
+ @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
+ super("");
+ this.sql = sql;
+ this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+ this.rows = rows;
+ }
+
+ JdbcGroupScan(String sql, JdbcStoragePlugin plugin, double rows) {
+ super("");
+ this.sql = sql;
+ this.plugin = plugin;
+ this.rows = rows;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ return new JdbcSubScan(sql, plugin);
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ return new ScanStats(
+ GroupScanProperty.NO_EXACT_ROW_COUNT,
+ (long) Math.max(rows, 1),
+ 1,
+ 1);
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ @Override
+ public String getDigest() {
+ return sql + String.valueOf(plugin.getConfig());
+ }
+
+ public StoragePluginConfig getConfig() {
+ return plugin.getConfig();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return new JdbcGroupScan(sql, plugin, rows);
+ }
+
+
+
+} \ No newline at end of file
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
new file mode 100644
index 000000000..0adb5e0b0
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.SinglePrel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+/**
+ * Prel used to represent a JDBC Conversion within an expression tree. This Prel will replaced with a full JdbcPrel
+ * before execution can happen.
+ */
+public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable {
+
+ public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+ super(cluster, traits, child);
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new JdbcIntermediatePrel(getCluster(), traitSet, getInput());
+ }
+
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return copy(getTraitSet(), getInputs());
+ }
+
+ @Override
+ public SelectionVectorMode getEncoding() {
+ return SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public Prel finalizeRel() {
+ return new JdbcPrel(getCluster(), getTraitSet(), this);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
new file mode 100644
index 000000000..2433fbd3d
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.store.jdbc.JdbcStoragePlugin.DrillJdbcConvention;
+
+/**
+ * Represents a JDBC Plan once the children nodes have been rewritten into SQL.
+ */
+public class JdbcPrel extends AbstractRelNode implements Prel {
+
+ private final String sql;
+ private final double rows;
+ private final DrillJdbcConvention convention;
+
+ public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel) {
+ super(cluster, traitSet);
+ final RelNode input = prel.getInput();
+ rows = input.getRows();
+ convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
+
+ // generate sql for tree.
+ final SqlDialect dialect = convention.getPlugin().getDialect();
+ final JdbcImplementor jdbcImplementor = new JdbcImplementor(
+ dialect,
+ (JavaTypeFactory) getCluster().getTypeFactory());
+ final JdbcImplementor.Result result =
+ jdbcImplementor.visitChild(0, input.accept(new SubsetRemover()));
+ sql = result.asQuery().toSqlString(dialect).getSql();
+ rowType = input.getRowType();
+ }
+
+ private class SubsetRemover extends RelShuttleImpl {
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof RelSubset) {
+ return ((RelSubset) other).getBest().accept(this);
+ } else {
+ return super.visit(other);
+ }
+ }
+
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ return new JdbcGroupScan(sql, convention.getPlugin(), rows);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("sql", sql);
+ }
+
+ @Override
+ public double getRows() {
+ return rows;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+ @Override
+ public SelectionVectorMode[] getSupportedEncodings() {
+ return SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public SelectionVectorMode getEncoding() {
+ return SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
new file mode 100755
index 000000000..69c45c2f4
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import javax.sql.DataSource;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+@SuppressWarnings("unchecked")
+class JdbcRecordReader extends AbstractRecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(JdbcRecordReader.class);
+
+ private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
+ private final DataSource source;
+ private ResultSet resultSet;
+ private final String storagePluginName;
+ private FragmentContext fragmentContext;
+ private Connection connection;
+ private Statement statement;
+ private final String sql;
+ private ImmutableList<ValueVector> vectors;
+ private ImmutableList<Copier<?>> copiers;
+
+ private OperatorContext operatorContext;
+
+ public JdbcRecordReader(FragmentContext fragmentContext, DataSource source, String sql, String storagePluginName) {
+ this.fragmentContext = fragmentContext;
+ this.source = source;
+ this.sql = sql;
+ this.storagePluginName = storagePluginName;
+ }
+
+ static {
+ JDBC_TYPE_MAPPINGS = (ImmutableMap<Integer, MinorType>) (Object) ImmutableMap.builder()
+ .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
+ .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
+ .put(java.sql.Types.TINYINT, MinorType.INT)
+ .put(java.sql.Types.SMALLINT, MinorType.INT)
+ .put(java.sql.Types.INTEGER, MinorType.INT)
+ .put(java.sql.Types.BIGINT, MinorType.BIGINT)
+
+ .put(java.sql.Types.CHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.VARCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR)
+
+ .put(java.sql.Types.NCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR)
+
+ .put(java.sql.Types.VARBINARY, MinorType.VARBINARY)
+ .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY)
+
+ .put(java.sql.Types.NUMERIC, MinorType.FLOAT8)
+ .put(java.sql.Types.DECIMAL, MinorType.FLOAT8)
+ .put(java.sql.Types.REAL, MinorType.FLOAT8)
+
+ .put(java.sql.Types.DATE, MinorType.DATE)
+ .put(java.sql.Types.TIME, MinorType.TIME)
+ .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP)
+
+ .put(java.sql.Types.BOOLEAN, MinorType.BIT)
+
+ .build();
+ }
+
+ private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) {
+
+ if (v instanceof NullableBigIntVector) {
+ return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableFloat4Vector) {
+ return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator());
+ } else if (v instanceof NullableFloat8Vector) {
+ return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator());
+ } else if (v instanceof NullableIntVector) {
+ return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableVarCharVector) {
+ return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableVarBinaryVector) {
+ return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableDateVector) {
+ return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableTimeVector) {
+ return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableTimeStampVector) {
+ return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator());
+ } else if (v instanceof NullableBitVector) {
+ return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator());
+ }
+
+ throw new IllegalArgumentException("Unknown how to handle vector.");
+ }
+
+ @Override
+ public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
+ try {
+
+ this.operatorContext = operatorContext;
+ connection = source.getConnection();
+ statement = connection.createStatement();
+ resultSet = statement.executeQuery(sql);
+
+ final ResultSetMetaData meta = resultSet.getMetaData();
+ final int columns = meta.getColumnCount();
+ ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
+ ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
+
+ for (int i = 1; i <= columns; i++) {
+ final String name = meta.getColumnLabel(i);
+ final int jdbcType = meta.getColumnType(i);
+ final int width = meta.getPrecision(i);
+ final int scale = meta.getScale(i);
+ MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
+ if (minorType == null) {
+ throw UserException.dataReadError()
+ .message("The JDBC storage plugin failed while trying to execute a query. "
+ + "The JDBC data type %d is not currently supported.", jdbcType)
+
+ .addContext("sql", sql)
+ .addContext("plugin", storagePluginName)
+ .build(logger);
+ }
+
+ final MajorType type = Types.optional(minorType);
+ final MaterializedField field = MaterializedField.create(name, type);
+ final Class<? extends ValueVector> clazz = (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(
+ minorType, type.getMode());
+ ValueVector vector = output.addField(field, clazz);
+ vectorBuilder.add(vector);
+ copierBuilder.add(getCopier(jdbcType, i, resultSet, vector));
+
+ }
+
+ vectors = vectorBuilder.build();
+ copiers = copierBuilder.build();
+
+ } catch (SQLException | SchemaChangeException e) {
+ throw UserException.dataReadError(e)
+ .message("The JDBC storage plugin failed while trying setup the SQL query. ")
+ .addContext("sql", sql)
+ .addContext("plugin", storagePluginName)
+ .build(logger);
+ }
+ }
+
+
+ @Override
+ public int next() {
+ int counter = 0;
+ Boolean b = true;
+ try {
+ while (counter < 4095 && b == true) { // loop at 4095 since nullables use one more than record count and we
+ // allocate on powers of two.
+ b = resultSet.next();
+ if(b == false) {
+ break;
+ }
+ for (Copier<?> c : copiers) {
+ c.copy(counter);
+ }
+ counter++;
+ }
+ } catch (SQLException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure while attempting to read from database.")
+ .addContext("sql", sql)
+ .addContext("plugin", storagePluginName)
+ .build(logger);
+ }
+
+ for (ValueVector vv : vectors) {
+ vv.getMutator().setValueCount(counter > 0 ? counter : 0);
+ }
+
+ return counter>0 ? counter : 0;
+ }
+
+ @Override
+ public void cleanup() {
+ AutoCloseables.close(resultSet, logger);
+ AutoCloseables.close(statement, logger);
+ AutoCloseables.close(connection, logger);
+ }
+
+ private abstract class Copier<T extends ValueVector.Mutator> {
+ protected final int columnIndex;
+ protected final ResultSet result;
+ protected final T mutator;
+
+ public Copier(int columnIndex, ResultSet result, T mutator) {
+ super();
+ this.columnIndex = columnIndex;
+ this.result = result;
+ this.mutator = mutator;
+ }
+
+ abstract void copy(int index) throws SQLException;
+ }
+
+ private class IntCopier extends Copier<NullableIntVector.Mutator> {
+ public IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
+ super(offset, set, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ mutator.setSafe(index, result.getInt(columnIndex));
+ if (result.wasNull()) {
+ mutator.setNull(index);
+ }
+ }
+ }
+
+ private class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
+ public BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
+ super(offset, set, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ mutator.setSafe(index, result.getLong(columnIndex));
+ if (result.wasNull()) {
+ mutator.setNull(index);
+ }
+ }
+
+ }
+
+ private class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
+
+ public Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ mutator.setSafe(index, result.getFloat(columnIndex));
+ if (result.wasNull()) {
+ mutator.setNull(index);
+ }
+ }
+
+ }
+
+
+ private class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
+
+ public Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ mutator.setSafe(index, result.getDouble(columnIndex));
+ if (result.wasNull()) {
+ mutator.setNull(index);
+ }
+
+ }
+
+ }
+
+ private class DecimalCopier extends Copier<NullableFloat8Vector.Mutator> {
+
+ public DecimalCopier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ BigDecimal decimal = result.getBigDecimal(columnIndex);
+ if (decimal != null) {
+ mutator.setSafe(index, decimal.doubleValue());
+ }
+ }
+
+ }
+
+ private class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
+
+ public VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ String val = resultSet.getString(columnIndex);
+ if (val != null) {
+ byte[] record = val.getBytes(Charsets.UTF_8);
+ mutator.setSafe(index, record, 0, record.length);
+ }
+ }
+
+ }
+
+ private class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
+
+ public VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ byte[] record = result.getBytes(columnIndex);
+ if (record != null) {
+ mutator.setSafe(index, record, 0, record.length);
+ }
+ }
+
+ }
+
+ private class DateCopier extends Copier<NullableDateVector.Mutator> {
+
+ public DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ Date date = result.getDate(columnIndex);
+ if (date != null) {
+ mutator.setSafe(index, date.getTime());
+ }
+ }
+
+ }
+
+ private class TimeCopier extends Copier<NullableTimeVector.Mutator> {
+
+ public TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ Time time = result.getTime(columnIndex);
+ if (time != null) {
+ mutator.setSafe(index, (int) time.getTime());
+ }
+
+ }
+
+ }
+
+ private class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
+
+ public TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ Timestamp stamp = result.getTimestamp(columnIndex);
+ if (stamp != null) {
+ mutator.setSafe(index, stamp.getTime());
+ }
+
+ }
+
+ }
+
+ private class BitCopier extends Copier<NullableBitVector.Mutator> {
+
+ public BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
+ super(columnIndex, result, mutator);
+ }
+
+ @Override
+ void copy(int index) throws SQLException {
+ mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0);
+ if (result.wasNull()) {
+ mutator.setNull(index);
+ }
+ }
+
+ }
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
new file mode 100755
index 000000000..5a921d400
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(JdbcStorageConfig.NAME)
+public class JdbcStorageConfig extends StoragePluginConfig {
+
+ public static final String NAME = "jdbc";
+
+ private final String driver;
+ private final String url;
+ private final String username;
+ private final String password;
+
+ @JsonCreator
+ public JdbcStorageConfig(
+ @JsonProperty("driver") String driver,
+ @JsonProperty("url") String url,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password) {
+ super();
+ this.driver = driver;
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((driver == null) ? 0 : driver.hashCode());
+ result = prime * result + ((password == null) ? 0 : password.hashCode());
+ result = prime * result + ((url == null) ? 0 : url.hashCode());
+ result = prime * result + ((username == null) ? 0 : username.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ JdbcStorageConfig other = (JdbcStorageConfig) obj;
+ if (driver == null) {
+ if (other.driver != null) {
+ return false;
+ }
+ } else if (!driver.equals(other.driver)) {
+ return false;
+ }
+ if (password == null) {
+ if (other.password != null) {
+ return false;
+ }
+ } else if (!password.equals(other.password)) {
+ return false;
+ }
+ if (url == null) {
+ if (other.url != null) {
+ return false;
+ }
+ } else if (!url.equals(other.url)) {
+ return false;
+ }
+ if (username == null) {
+ if (other.username != null) {
+ return false;
+ }
+ } else if (!username.equals(other.username)) {
+ return false;
+ }
+ return true;
+ }
+
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
new file mode 100755
index 000000000..f27f6f106
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.calcite.adapter.jdbc.JdbcConvention;
+import org.apache.calcite.adapter.jdbc.JdbcRules;
+import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin;
+import org.apache.calcite.adapter.jdbc.JdbcSchema;
+import org.apache.calcite.linq4j.tree.ConstantUntypedNull;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.jdbc.DrillJdbcRuleBase.DrillJdbcFilterRule;
+import org.apache.drill.exec.store.jdbc.DrillJdbcRuleBase.DrillJdbcProjectRule;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+public class JdbcStoragePlugin extends AbstractStoragePlugin {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JdbcStoragePlugin.class);
+
+ // Rules from Calcite's JdbcRules class that we want to avoid using.
+ private static String[] RULES_TO_AVOID = {
+ "JdbcToEnumerableConverterRule", "JdbcFilterRule", "JdbcProjectRule"
+ };
+
+
+ private final JdbcStorageConfig config;
+ private final DrillbitContext context;
+ private final DataSource source;
+ private final String name;
+ private final SqlDialect dialect;
+ private final DrillJdbcConvention convention;
+
+
+ public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) {
+ this.context = context;
+ this.config = config;
+ this.name = name;
+ BasicDataSource source = new BasicDataSource();
+ source.setDriverClassName(config.getDriver());
+ source.setUrl(config.getUrl());
+
+ if (config.getUsername() != null) {
+ source.setUsername(config.getUsername());
+ }
+
+ if (config.getPassword() != null) {
+ source.setPassword(config.getPassword());
+ }
+
+ this.source = source;
+ this.dialect = JdbcSchema.createDialect(source);
+ this.convention = new DrillJdbcConvention(dialect, name);
+ }
+
+
+ class DrillJdbcConvention extends JdbcConvention {
+
+ private final ImmutableSet<RelOptRule> rules;
+
+ public DrillJdbcConvention(SqlDialect dialect, String name) {
+ super(dialect, ConstantUntypedNull.INSTANCE, name);
+
+
+ // build rules for this convention.
+ ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
+
+ builder.add(JDBC_PRULE_INSTANCE);
+ builder.add(new JdbcDrelConverterRule(this));
+ builder.add(new DrillJdbcProjectRule(this));
+ builder.add(new DrillJdbcFilterRule(this));
+
+ outside: for (RelOptRule rule : JdbcRules.rules(this)) {
+ final String description = rule.toString();
+
+ // we want to black list some rules but the parent Calcite package is all or none.
+ // Therefore, we remove rules with names we don't like.
+ for(String black : RULES_TO_AVOID){
+ if(description.equals(black)){
+ continue outside;
+ }
+
+ }
+
+ builder.add(rule);
+ }
+
+ builder.add(FilterSetOpTransposeRule.INSTANCE);
+ builder.add(ProjectRemoveRule.INSTANCE);
+
+ rules = builder.build();
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ for (RelOptRule rule : rules) {
+ planner.addRule(rule);
+ }
+ }
+
+ public Set<RelOptRule> getRules() {
+ return rules;
+ }
+
+ public JdbcStoragePlugin getPlugin() {
+ return JdbcStoragePlugin.this;
+ }
+ }
+
+ /**
+ * Returns whether a condition is supported by {@link JdbcJoin}.
+ *
+ * <p>Corresponds to the capabilities of
+ * {@link JdbcJoin#convertConditionToSqlNode}.
+ *
+ * @param node Condition
+ * @return Whether condition is supported
+ */
+ private static boolean canJoinOnCondition(RexNode node) {
+ final List<RexNode> operands;
+ switch (node.getKind()) {
+ case AND:
+ case OR:
+ operands = ((RexCall) node).getOperands();
+ for (RexNode operand : operands) {
+ if (!canJoinOnCondition(operand)) {
+ return false;
+ }
+ }
+ return true;
+
+ case EQUALS:
+ case IS_NOT_DISTINCT_FROM:
+ case NOT_EQUALS:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ operands = ((RexCall) node).getOperands();
+ if ((operands.get(0) instanceof RexInputRef)
+ && (operands.get(1) instanceof RexInputRef)) {
+ return true;
+ }
+ // fall through
+
+ default:
+ return false;
+ }
+ }
+
+
+ private static final JdbcPrule JDBC_PRULE_INSTANCE = new JdbcPrule();
+
+ private static class JdbcPrule extends ConverterRule {
+
+ private JdbcPrule() {
+ super(JdbcDrel.class, DrillRel.DRILL_LOGICAL, Prel.DRILL_PHYSICAL, "JDBC_PREL_Converter");
+ }
+
+ @Override
+ public RelNode convert(RelNode in) {
+
+ return new JdbcIntermediatePrel(
+ in.getCluster(),
+ in.getTraitSet().replace(getOutTrait()),
+ in.getInput(0));
+ }
+
+ }
+
+ private class JdbcDrelConverterRule extends ConverterRule {
+
+ public JdbcDrelConverterRule(DrillJdbcConvention in) {
+ super(RelNode.class, in, DrillRel.DRILL_LOGICAL, "JDBC_DREL_Converter" + in.getName());
+ }
+
+ @Override
+ public RelNode convert(RelNode in) {
+ return new JdbcDrel(in.getCluster(), in.getTraitSet().replace(DrillRel.DRILL_LOGICAL),
+ convert(in, in.getTraitSet().replace(this.getInTrait())));
+ }
+
+ }
+
+ private class CapitalizingJdbcSchema extends AbstractSchema {
+
+ private final JdbcSchema inner;
+
+ public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource,
+ SqlDialect dialect, JdbcConvention convention, String catalog, String schema) {
+ super(parentSchemaPath, name);
+ inner = new JdbcSchema(dataSource, dialect, convention, catalog, schema);
+ }
+
+ @Override
+ public String getTypeName() {
+ return JdbcStorageConfig.NAME;
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String name) {
+ return inner.getFunctions(name);
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return inner.getFunctionNames();
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return inner.getSubSchema(name);
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return inner.getSubSchemaNames();
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return inner.getTableNames();
+ }
+
+ @Override
+ public Table getTable(String name) {
+ Table table = inner.getTable(name);
+ if (table != null) {
+ return table;
+ }
+ return inner.getTable(name.toUpperCase());
+
+ }
+
+ }
+
+ private class JdbcCatalogSchema extends AbstractSchema {
+
+ private final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap();
+ private final CapitalizingJdbcSchema defaultSchema;
+
+ public JdbcCatalogSchema(String name) {
+ super(ImmutableList.<String> of(), name);
+
+ try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) {
+ while (set.next()) {
+ final String catalogName = set.getString(1);
+ CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect,
+ convention, catalogName, null);
+ schemaMap.put(catalogName, schema);
+ }
+ } catch (SQLException e) {
+ logger.warn("Failure while attempting to load JDBC schema.", e);
+ }
+
+ // unable to read general catalog
+ if (schemaMap.isEmpty()) {
+ schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect,
+ convention,
+ null, null));
+ }
+
+ defaultSchema = schemaMap.values().iterator().next();
+
+ }
+
+ @Override
+ public String getTypeName() {
+ return JdbcStorageConfig.NAME;
+ }
+
+ @Override
+ public Schema getDefaultSchema() {
+ return defaultSchema;
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return schemaMap.get(name);
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return schemaMap.keySet();
+ }
+
+ @Override
+ public Table getTable(String name) {
+ Schema schema = getDefaultSchema();
+ if (schema != null) {
+ Table t = schema.getTable(name);
+ if (t != null) {
+ return t;
+ }
+ return schema.getTable(name.toUpperCase());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return defaultSchema.getTableNames();
+ }
+
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
+ JdbcCatalogSchema schema = new JdbcCatalogSchema(name);
+ parent.add(name, schema);
+ }
+
+ @Override
+ public JdbcStorageConfig getConfig() {
+ return config;
+ }
+
+ public DrillbitContext getContext() {
+ return this.context;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ public DataSource getSource() {
+ return source;
+ }
+
+ public SqlDialect getDialect() {
+ return dialect;
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<RelOptRule> getOptimizerRules(OptimizerRulesContext context) {
+ return convention.getRules();
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
new file mode 100755
index 000000000..fcafd4c27
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("jdbc-sub-scan")
+public class JdbcSubScan extends AbstractSubScan {
+
+ private final String sql;
+ private final JdbcStoragePlugin plugin;
+
+ @JsonCreator
+ public JdbcSubScan(
+ @JsonProperty("sql") String sql,
+ @JsonProperty("config") StoragePluginConfig config,
+ @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
+ super("");
+ this.sql = sql;
+ this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+ }
+
+ JdbcSubScan(String sql, JdbcStoragePlugin plugin) {
+ super("");
+ this.sql = sql;
+ this.plugin = plugin;
+ }
+
+ @Override
+ public int getOperatorType() {
+ return -1;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public StoragePluginConfig getConfig() {
+ return plugin.getConfig();
+ }
+
+ @JsonIgnore
+ public JdbcStoragePlugin getPlugin() {
+ return plugin;
+ }
+
+} \ No newline at end of file
diff --git a/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
new file mode 100755
index 000000000..7d8805266
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,10 @@
+{
+ "storage":{
+ "jdbc" : {
+ type:"jdbc",
+ enabled: false,
+ driver:"org.apache.derby.jdbc.ClientDriver",
+ url:"jdbc:derby://localhost:20000/memory:testDB;"
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/resources/drill-module.conf b/contrib/storage-jdbc/src/main/resources/drill-module.conf
new file mode 100755
index 000000000..721a59950
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/resources/drill-module.conf
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
new file mode 100644
index 000000000..1f150682b
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
@@ -0,0 +1,181 @@
+package org.apache.drill.exec.store.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetAddress;
+import java.sql.Connection;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.derby.drda.NetworkServerControl;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+public class TestJdbcPlugin extends PlanTestBase {
+
+ static NetworkServerControl server;
+
+ @BeforeClass
+ public static void setupDefaultTestCluster() throws Exception {
+ System.setProperty("derby.drda.startNetworkServer", "true");
+ server = new NetworkServerControl(InetAddress.getByName("localhost"),
+ 20000,
+ "admin",
+ "admin");
+ java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true);
+ server.start(consoleWriter);
+
+ BasicDataSource source = new BasicDataSource();
+ source.setUrl("jdbc:derby://localhost:20000/memory:testDB;create=true");
+ source.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
+
+ final String insertValues1 = "INSERT INTO person VALUES (1, 'Smith', null, '{number:\"123 Main\"}','mtrx', "
+ + "'xy', 333.333, 444.444, 555.00, TIME('15:09:02'), DATE('1994-02-23'), TIMESTAMP('1962-09-23 03:23:34.234'),"
+ + " 666.66, 1, -1, false)";
+ final String insertValues2 = "INSERT INTO person (PersonId) VALUES (null)";
+ try (Connection c = source.getConnection()) {
+ c.createStatement().execute("CREATE TABLE person\n" +
+ "(\n" +
+ "PersonID int,\n" +
+ "LastName varchar(255),\n" +
+ "FirstName varchar(255),\n" +
+ "Address varchar(255),\n" +
+ "City varchar(255),\n" +
+ "Code char(2),\n" +
+ "dbl double,\n" +
+ "flt float,\n" +
+ "rel real,\n" +
+ "tm time,\n" +
+ "dt date,\n" +
+ "tms timestamp,\n" +
+ "num numeric(10,2), \n" +
+ "sm smallint,\n" +
+ "bi bigint,\n" +
+ "bool boolean\n" +
+
+ ")");
+
+ c.createStatement().execute(insertValues1);
+ c.createStatement().execute(insertValues2);
+ c.createStatement().execute(insertValues1);
+ }
+
+ BaseTestQuery.setupDefaultTestCluster();
+ }
+
+ @AfterClass
+ public static void shutdownDb() throws Exception {
+ server.shutdown();
+ }
+
+ @Test
+ public void validateResult() throws Exception {
+ // we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support.
+ testBuilder()
+ .sqlQuery(
+ "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON")
+ .ordered()
+ .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL",
+ "NUM", "SM", "BI", "BOOL")
+ .baselineValues(1, "Smith", null, "{number:\"123 Main\"}", "mtrx", "xy", 333.333, 444.444, 555.00,
+ 666.66, 1, -1l, false)
+ .baselineValues(null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(1, "Smith", null, "{number:\"123 Main\"}", "mtrx", "xy", 333.333, 444.444, 555.00,
+ 666.66, 1, -1l, false)
+ .build().run();
+ }
+
+ @Test
+ public void queryDefaultSchema() throws Exception {
+ testNoResult("select * from testdb.PERSON");
+ }
+
+ @Test
+ public void queryDifferentCase() throws Exception {
+ testNoResult("select * from testdb.person");
+ }
+
+ @Test
+ public void pushdownJoin() throws Exception {
+ testNoResult("use testdb");
+ String query = "select x.PersonId from (select PersonId from person)x "
+ + "join (select PersonId from person)y on x.PersonId = y.PersonId ";
+ testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join" });
+
+ }
+
+ @Test
+ public void pushdownJoinAndFilterPushDown() throws Exception {
+ final String query = "select * from \n" +
+ "testdb.`default`.PERSON e\n" +
+ "INNER JOIN \n" +
+ "testdb.`default`.PERSON s\n" +
+ "ON e.FirstName = s.FirstName\n" +
+ "WHERE e.LastName > 'hello'";
+
+ testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });
+ }
+
+ @Test
+ public void pushdownAggregation() throws Exception {
+ final String query = "select count(*) from \n" +
+ "testdb.`default`.PERSON";
+
+ testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" });
+ }
+
+ @Test
+ public void pushdownDoubleJoinAndFilter() throws Exception {
+ final String query = "select * from \n" +
+ "testdb.`default`.PERSON e\n" +
+ "INNER JOIN \n" +
+ "testdb.`default`.PERSON s\n" +
+ "ON e.PersonId = s.PersonId\n" +
+ "INNER JOIN \n" +
+ "testdb.`default`.PERSON ed\n" +
+ "ON e.PersonId = ed.PersonId\n" +
+ "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'";
+ testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });
+ }
+
+ @Test
+ public void showTablesDefaultSchema() throws Exception {
+ testNoResult("use testdb");
+ assertEquals(1, testRunAndPrint(QueryType.SQL, "show tables like 'PERSON'"));
+ }
+
+ @Test
+ public void describe() throws Exception {
+ testNoResult("use testdb");
+ assertEquals(16, testRunAndPrint(QueryType.SQL, "describe PERSON"));
+ }
+
+ @Test
+ public void ensureDrillFunctionsAreNotPushedDown() throws Exception {
+ // This should verify that we're not trying to push CONVERT_FROM into the JDBC storage plugin. If were pushing
+ // this function down, the SQL query would fail.
+ testNoResult("select CONVERT_FROM(Address, 'JSON') from testdb.person where PersonId = 1");
+ }
+
+ @Test
+ public void pushdownFilter() throws Exception {
+ testNoResult("use testdb");
+ String query = "select * from person where PersonId = 1";
+ testPlanMatchingPatterns(query, new String[] {}, new String[] { "Filter" });
+ }
+}
diff --git a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
new file mode 100755
index 000000000..200ab93e4
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,10 @@
+{
+ "storage":{
+ testdb : {
+ type:"jdbc",
+ enabled: true,
+ driver:"org.apache.derby.jdbc.ClientDriver",
+ url:"jdbc:derby://localhost:20000/memory:testDB;"
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/test/resources/logback.xml b/contrib/storage-jdbc/src/test/resources/logback.xml
new file mode 100644
index 000000000..5facafe61
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/resources/logback.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+ <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+ <Compressing>true</Compressing>
+ <ReconnectionDelay>10000</ReconnectionDelay>
+ <IncludeCallerData>true</IncludeCallerData>
+ <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ <level value="warn" />
+ </appender>
+
+ <logger name="org.apache.drill" additivity="false">
+ <level value="debug" />
+ <appender-ref ref="SOCKET" />
+<!-- <appender-ref ref="STDOUT" /> -->
+ </logger>
+
+ <root>
+ <level value="debug" />
+ <appender-ref ref="SOCKET" />
+<!-- <appender-ref ref="STDOUT" /> -->
+ </root>
+
+</configuration>