diff options
author | Jacques Nadeau <jacques@apache.org> | 2015-08-01 18:11:51 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-09-13 18:26:44 -0700 |
commit | e12cd470e4ab57b025840fdfa200a051a01df029 (patch) | |
tree | 09563622a7a4ccfdd21a6dc29a8927b7c28e3106 /contrib/storage-jdbc/src | |
parent | 8478e9fb1d7e6881d8f092ae9ff3a338f2e023a6 (diff) |
DRILL-3180: JDBC Storage Plugin updates.
- Move to leverage Calcite's JDBC adapter capabilities for pushdowns, schema, etc.
- Add test cases using Derby
Diffstat (limited to 'contrib/storage-jdbc/src')
16 files changed, 1926 insertions, 0 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java new file mode 100644 index 000000000..bbb4daf6c --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcRules; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexNode; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +abstract class DrillJdbcRuleBase extends ConverterRule { + + protected final LoadingCache<RexNode, Boolean> checkedExpressions = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(10, TimeUnit.MINUTES) + .build( + new CacheLoader<RexNode, Boolean>() { + public Boolean load(RexNode expr) { + return JdbcExpressionCheck.isOnlyStandardExpressions(expr); + } + }); + + protected final JdbcConvention out; + + private DrillJdbcRuleBase(Class<? extends RelNode> clazz, RelTrait in, JdbcConvention out, String description) { + super(clazz, in, out, description); + this.out = out; + } + + static class DrillJdbcProjectRule extends DrillJdbcRuleBase { + + public DrillJdbcProjectRule(JdbcConvention out) { + super(LogicalProject.class, Convention.NONE, out, "JdbcProjectRule"); + } + + public RelNode convert(RelNode rel) { + LogicalProject project = (LogicalProject) rel; + return new JdbcRules.JdbcProject(rel.getCluster(), rel.getTraitSet().replace(this.out), convert( + project.getInput(), project.getInput().getTraitSet().replace(this.out)), project.getProjects(), + project.getRowType()); + } + + @Override + public boolean matches(RelOptRuleCall call) { + try { + + final LogicalProject project = (LogicalProject) call.rel(0); + for (RexNode node : project.getChildExps()) { + if (!checkedExpressions.get(node)) { + return false; + } + } + return true; + + } catch (ExecutionException e) { + throw new IllegalStateException("Failure while trying to evaluate pushdown.", e); + } + } + } + + static class DrillJdbcFilterRule extends DrillJdbcRuleBase { + + public DrillJdbcFilterRule(JdbcConvention out) { + super(LogicalFilter.class, Convention.NONE, out, "DrillJdbcFilterRule"); + } + + public RelNode convert(RelNode rel) { + LogicalFilter filter = (LogicalFilter) rel; + + return new JdbcRules.JdbcFilter(rel.getCluster(), rel.getTraitSet().replace(this.out), convert(filter.getInput(), + filter.getInput().getTraitSet().replace(this.out)), filter.getCondition()); + } + + @Override + public boolean matches(RelOptRuleCall call) { + try { + + final LogicalFilter filter = (LogicalFilter) call.rel(0); + for (RexNode node : filter.getChildExps()) { + if (!checkedExpressions.get(node)) { + return false; + } + } + return true; + + } catch (ExecutionException e) { + throw new IllegalStateException("Failure while trying to evaluate pushdown.", e); + } + } + } + +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java new file mode 100755 index 000000000..fa44b5505 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.util.Collections; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +//import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; + +import com.google.common.base.Preconditions; + +public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> { + @Override + public ScanBatch getBatch(FragmentContext context, JdbcSubScan config, + List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + JdbcStoragePlugin plugin = config.getPlugin(); + RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName()); + return new ScanBatch(config, context, Collections.singletonList(reader).iterator()); + } +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java new file mode 100644 index 000000000..52dd29f77 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.logical.DrillImplementor; +import org.apache.drill.exec.planner.logical.DrillRel; + +public class JdbcDrel extends SingleRel implements DrillRel { + + public JdbcDrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { + super(cluster, traits, child); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new JdbcDrel(getCluster(), traitSet, inputs.iterator().next()); + } + + @Override + protected Object clone() throws CloneNotSupportedException { + return copy(getTraitSet(), getInputs()); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + throw new UnsupportedOperationException(); + } + +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java new file mode 100644 index 000000000..2015a7754 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexFieldCollation; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexWindow; +import org.apache.drill.exec.planner.sql.DrillSqlOperator; + +/** + * Visitor class that determines whether or not a particular RexNode expression tree contains only standard expressions. + * If RexNode tree contains Drill specific expressions, the tree will respond with false. + */ +class JdbcExpressionCheck implements RexVisitor<Boolean> { + + private static final JdbcExpressionCheck INSTANCE = new JdbcExpressionCheck(); + + public static boolean isOnlyStandardExpressions(RexNode rex) { + return rex.accept(INSTANCE); + } + + @Override + public Boolean visitInputRef(RexInputRef paramRexInputRef) { + return true; + } + + @Override + public Boolean visitLocalRef(RexLocalRef paramRexLocalRef) { + return true; + } + + @Override + public Boolean visitLiteral(RexLiteral paramRexLiteral) { + return true; + } + + @Override + public Boolean visitCall(RexCall paramRexCall) { + if(paramRexCall.getOperator() instanceof DrillSqlOperator){ + return false; + }else{ + for (RexNode operand : paramRexCall.operands) { + if (!operand.accept(this)) { + return false; + } + } + } + return true; + } + + public Boolean visitOver(RexOver over) { + if (!visitCall(over)) { + return false; + } + ; + + final RexWindow window = over.getWindow(); + for (RexFieldCollation orderKey : window.orderKeys) { + if (!((RexNode) orderKey.left).accept(this)) { + return false; + } + } + + for (RexNode partitionKey : window.partitionKeys) { + if (!partitionKey.accept(this)) { + return false; + } + } + + return true; + + } + + @Override + public Boolean visitCorrelVariable(RexCorrelVariable paramRexCorrelVariable) { + return true; + } + + @Override + public Boolean visitDynamicParam(RexDynamicParam paramRexDynamicParam) { + return true; + } + + @Override + public Boolean visitRangeRef(RexRangeRef paramRexRangeRef) { + return true; + } + + @Override + public Boolean visitFieldAccess(RexFieldAccess paramRexFieldAccess) { + return paramRexFieldAccess.getReferenceExpr().accept(this); + } + +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java new file mode 100644 index 000000000..95b03cf50 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("jdbc-scan") +public class JdbcGroupScan extends AbstractGroupScan { + + private final String sql; + private final JdbcStoragePlugin plugin; + private final double rows; + + @JsonCreator + public JdbcGroupScan( + @JsonProperty("sql") String sql, + @JsonProperty("config") StoragePluginConfig config, + @JsonProperty("rows") double rows, + @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException { + super(""); + this.sql = sql; + this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config); + this.rows = rows; + } + + JdbcGroupScan(String sql, JdbcStoragePlugin plugin, double rows) { + super(""); + this.sql = sql; + this.plugin = plugin; + this.rows = rows; + } + + @Override + public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { + } + + @Override + public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { + return new JdbcSubScan(sql, plugin); + } + + @Override + public int getMaxParallelizationWidth() { + return 1; + } + + @Override + public ScanStats getScanStats() { + return new ScanStats( + GroupScanProperty.NO_EXACT_ROW_COUNT, + (long) Math.max(rows, 1), + 1, + 1); + } + + public String getSql() { + return sql; + } + + @Override + public String getDigest() { + return sql + String.valueOf(plugin.getConfig()); + } + + public StoragePluginConfig getConfig() { + return plugin.getConfig(); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + return new JdbcGroupScan(sql, plugin, rows); + } + + + +}
\ No newline at end of file diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java new file mode 100644 index 000000000..0adb5e0b0 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.io.IOException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.SinglePrel; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +/** + * Prel used to represent a JDBC Conversion within an expression tree. This Prel will replaced with a full JdbcPrel + * before execution can happen. + */ +public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable { + + public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { + super(cluster, traits, child); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new JdbcIntermediatePrel(getCluster(), traitSet, getInput()); + } + + @Override + protected Object clone() throws CloneNotSupportedException { + return copy(getTraitSet(), getInputs()); + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + @Override + public Prel finalizeRel() { + return new JdbcPrel(getCluster(), getTraitSet(), this); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor."); + } + + @Override + public boolean needsFinalColumnReordering() { + return false; + } + +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java new file mode 100644 index 000000000..2433fbd3d --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.jdbc.JdbcImplementor; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.sql.SqlDialect; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.store.jdbc.JdbcStoragePlugin.DrillJdbcConvention; + +/** + * Represents a JDBC Plan once the children nodes have been rewritten into SQL. + */ +public class JdbcPrel extends AbstractRelNode implements Prel { + + private final String sql; + private final double rows; + private final DrillJdbcConvention convention; + + public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel) { + super(cluster, traitSet); + final RelNode input = prel.getInput(); + rows = input.getRows(); + convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE); + + // generate sql for tree. + final SqlDialect dialect = convention.getPlugin().getDialect(); + final JdbcImplementor jdbcImplementor = new JdbcImplementor( + dialect, + (JavaTypeFactory) getCluster().getTypeFactory()); + final JdbcImplementor.Result result = + jdbcImplementor.visitChild(0, input.accept(new SubsetRemover())); + sql = result.asQuery().toSqlString(dialect).getSql(); + rowType = input.getRowType(); + } + + private class SubsetRemover extends RelShuttleImpl { + + @Override + public RelNode visit(RelNode other) { + if (other instanceof RelSubset) { + return ((RelSubset) other).getBest().accept(this); + } else { + return super.visit(other); + } + } + + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + return new JdbcGroupScan(sql, convention.getPlugin(), rows); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).item("sql", sql); + } + + @Override + public double getRows() { + return rows; + } + + @Override + public Iterator<Prel> iterator() { + return Collections.emptyIterator(); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + @Override + public boolean needsFinalColumnReordering() { + return false; + } +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java new file mode 100755 index 000000000..69c45c2f4 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -0,0 +1,431 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; + +import javax.sql.DataSource; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.NullableDateVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableTimeVector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +@SuppressWarnings("unchecked") +class JdbcRecordReader extends AbstractRecordReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(JdbcRecordReader.class); + + private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS; + private final DataSource source; + private ResultSet resultSet; + private final String storagePluginName; + private FragmentContext fragmentContext; + private Connection connection; + private Statement statement; + private final String sql; + private ImmutableList<ValueVector> vectors; + private ImmutableList<Copier<?>> copiers; + + private OperatorContext operatorContext; + + public JdbcRecordReader(FragmentContext fragmentContext, DataSource source, String sql, String storagePluginName) { + this.fragmentContext = fragmentContext; + this.source = source; + this.sql = sql; + this.storagePluginName = storagePluginName; + } + + static { + JDBC_TYPE_MAPPINGS = (ImmutableMap<Integer, MinorType>) (Object) ImmutableMap.builder() + .put(java.sql.Types.DOUBLE, MinorType.FLOAT8) + .put(java.sql.Types.FLOAT, MinorType.FLOAT4) + .put(java.sql.Types.TINYINT, MinorType.INT) + .put(java.sql.Types.SMALLINT, MinorType.INT) + .put(java.sql.Types.INTEGER, MinorType.INT) + .put(java.sql.Types.BIGINT, MinorType.BIGINT) + + .put(java.sql.Types.CHAR, MinorType.VARCHAR) + .put(java.sql.Types.VARCHAR, MinorType.VARCHAR) + .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR) + + .put(java.sql.Types.NCHAR, MinorType.VARCHAR) + .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR) + .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR) + + .put(java.sql.Types.VARBINARY, MinorType.VARBINARY) + .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY) + + .put(java.sql.Types.NUMERIC, MinorType.FLOAT8) + .put(java.sql.Types.DECIMAL, MinorType.FLOAT8) + .put(java.sql.Types.REAL, MinorType.FLOAT8) + + .put(java.sql.Types.DATE, MinorType.DATE) + .put(java.sql.Types.TIME, MinorType.TIME) + .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP) + + .put(java.sql.Types.BOOLEAN, MinorType.BIT) + + .build(); + } + + private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) { + + if (v instanceof NullableBigIntVector) { + return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator()); + } else if (v instanceof NullableFloat4Vector) { + return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator()); + } else if (v instanceof NullableFloat8Vector) { + return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator()); + } else if (v instanceof NullableIntVector) { + return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator()); + } else if (v instanceof NullableVarCharVector) { + return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator()); + } else if (v instanceof NullableVarBinaryVector) { + return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator()); + } else if (v instanceof NullableDateVector) { + return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator()); + } else if (v instanceof NullableTimeVector) { + return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator()); + } else if (v instanceof NullableTimeStampVector) { + return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator()); + } else if (v instanceof NullableBitVector) { + return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator()); + } + + throw new IllegalArgumentException("Unknown how to handle vector."); + } + + @Override + public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { + try { + + this.operatorContext = operatorContext; + connection = source.getConnection(); + statement = connection.createStatement(); + resultSet = statement.executeQuery(sql); + + final ResultSetMetaData meta = resultSet.getMetaData(); + final int columns = meta.getColumnCount(); + ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder(); + ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder(); + + for (int i = 1; i <= columns; i++) { + final String name = meta.getColumnLabel(i); + final int jdbcType = meta.getColumnType(i); + final int width = meta.getPrecision(i); + final int scale = meta.getScale(i); + MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType); + if (minorType == null) { + throw UserException.dataReadError() + .message("The JDBC storage plugin failed while trying to execute a query. " + + "The JDBC data type %d is not currently supported.", jdbcType) + + .addContext("sql", sql) + .addContext("plugin", storagePluginName) + .build(logger); + } + + final MajorType type = Types.optional(minorType); + final MaterializedField field = MaterializedField.create(name, type); + final Class<? extends ValueVector> clazz = (Class<? extends ValueVector>) TypeHelper.getValueVectorClass( + minorType, type.getMode()); + ValueVector vector = output.addField(field, clazz); + vectorBuilder.add(vector); + copierBuilder.add(getCopier(jdbcType, i, resultSet, vector)); + + } + + vectors = vectorBuilder.build(); + copiers = copierBuilder.build(); + + } catch (SQLException | SchemaChangeException e) { + throw UserException.dataReadError(e) + .message("The JDBC storage plugin failed while trying setup the SQL query. ") + .addContext("sql", sql) + .addContext("plugin", storagePluginName) + .build(logger); + } + } + + + @Override + public int next() { + int counter = 0; + Boolean b = true; + try { + while (counter < 4095 && b == true) { // loop at 4095 since nullables use one more than record count and we + // allocate on powers of two. + b = resultSet.next(); + if(b == false) { + break; + } + for (Copier<?> c : copiers) { + c.copy(counter); + } + counter++; + } + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failure while attempting to read from database.") + .addContext("sql", sql) + .addContext("plugin", storagePluginName) + .build(logger); + } + + for (ValueVector vv : vectors) { + vv.getMutator().setValueCount(counter > 0 ? counter : 0); + } + + return counter>0 ? counter : 0; + } + + @Override + public void cleanup() { + AutoCloseables.close(resultSet, logger); + AutoCloseables.close(statement, logger); + AutoCloseables.close(connection, logger); + } + + private abstract class Copier<T extends ValueVector.Mutator> { + protected final int columnIndex; + protected final ResultSet result; + protected final T mutator; + + public Copier(int columnIndex, ResultSet result, T mutator) { + super(); + this.columnIndex = columnIndex; + this.result = result; + this.mutator = mutator; + } + + abstract void copy(int index) throws SQLException; + } + + private class IntCopier extends Copier<NullableIntVector.Mutator> { + public IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) { + super(offset, set, mutator); + } + + @Override + void copy(int index) throws SQLException { + mutator.setSafe(index, result.getInt(columnIndex)); + if (result.wasNull()) { + mutator.setNull(index); + } + } + } + + private class BigIntCopier extends Copier<NullableBigIntVector.Mutator> { + public BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) { + super(offset, set, mutator); + } + + @Override + void copy(int index) throws SQLException { + mutator.setSafe(index, result.getLong(columnIndex)); + if (result.wasNull()) { + mutator.setNull(index); + } + } + + } + + private class Float4Copier extends Copier<NullableFloat4Vector.Mutator> { + + public Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + mutator.setSafe(index, result.getFloat(columnIndex)); + if (result.wasNull()) { + mutator.setNull(index); + } + } + + } + + + private class Float8Copier extends Copier<NullableFloat8Vector.Mutator> { + + public Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + mutator.setSafe(index, result.getDouble(columnIndex)); + if (result.wasNull()) { + mutator.setNull(index); + } + + } + + } + + private class DecimalCopier extends Copier<NullableFloat8Vector.Mutator> { + + public DecimalCopier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + BigDecimal decimal = result.getBigDecimal(columnIndex); + if (decimal != null) { + mutator.setSafe(index, decimal.doubleValue()); + } + } + + } + + private class VarCharCopier extends Copier<NullableVarCharVector.Mutator> { + + public VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + String val = resultSet.getString(columnIndex); + if (val != null) { + byte[] record = val.getBytes(Charsets.UTF_8); + mutator.setSafe(index, record, 0, record.length); + } + } + + } + + private class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> { + + public VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + byte[] record = result.getBytes(columnIndex); + if (record != null) { + mutator.setSafe(index, record, 0, record.length); + } + } + + } + + private class DateCopier extends Copier<NullableDateVector.Mutator> { + + public DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + Date date = result.getDate(columnIndex); + if (date != null) { + mutator.setSafe(index, date.getTime()); + } + } + + } + + private class TimeCopier extends Copier<NullableTimeVector.Mutator> { + + public TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + Time time = result.getTime(columnIndex); + if (time != null) { + mutator.setSafe(index, (int) time.getTime()); + } + + } + + } + + private class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> { + + public TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + Timestamp stamp = result.getTimestamp(columnIndex); + if (stamp != null) { + mutator.setSafe(index, stamp.getTime()); + } + + } + + } + + private class BitCopier extends Copier<NullableBitVector.Mutator> { + + public BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) { + super(columnIndex, result, mutator); + } + + @Override + void copy(int index) throws SQLException { + mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0); + if (result.wasNull()) { + mutator.setNull(index); + } + } + + } + +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java new file mode 100755 index 000000000..5a921d400 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import org.apache.drill.common.logical.StoragePluginConfig; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(JdbcStorageConfig.NAME) +public class JdbcStorageConfig extends StoragePluginConfig { + + public static final String NAME = "jdbc"; + + private final String driver; + private final String url; + private final String username; + private final String password; + + @JsonCreator + public JdbcStorageConfig( + @JsonProperty("driver") String driver, + @JsonProperty("url") String url, + @JsonProperty("username") String username, + @JsonProperty("password") String password) { + super(); + this.driver = driver; + this.url = url; + this.username = username; + this.password = password; + } + + public String getDriver() { + return driver; + } + + public String getUrl() { + return url; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((driver == null) ? 0 : driver.hashCode()); + result = prime * result + ((password == null) ? 0 : password.hashCode()); + result = prime * result + ((url == null) ? 0 : url.hashCode()); + result = prime * result + ((username == null) ? 0 : username.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JdbcStorageConfig other = (JdbcStorageConfig) obj; + if (driver == null) { + if (other.driver != null) { + return false; + } + } else if (!driver.equals(other.driver)) { + return false; + } + if (password == null) { + if (other.password != null) { + return false; + } + } else if (!password.equals(other.password)) { + return false; + } + if (url == null) { + if (other.url != null) { + return false; + } + } else if (!url.equals(other.url)) { + return false; + } + if (username == null) { + if (other.username != null) { + return false; + } + } else if (!username.equals(other.username)) { + return false; + } + return true; + } + + +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java new file mode 100755 index 000000000..f27f6f106 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.sql.DataSource; + +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcRules; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin; +import org.apache.calcite.adapter.jdbc.JdbcSchema; +import org.apache.calcite.linq4j.tree.ConstantUntypedNull; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.rules.FilterSetOpTransposeRule; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlDialect; +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.jdbc.DrillJdbcRuleBase.DrillJdbcFilterRule; +import org.apache.drill.exec.store.jdbc.DrillJdbcRuleBase.DrillJdbcProjectRule; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +public class JdbcStoragePlugin extends AbstractStoragePlugin { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JdbcStoragePlugin.class); + + // Rules from Calcite's JdbcRules class that we want to avoid using. + private static String[] RULES_TO_AVOID = { + "JdbcToEnumerableConverterRule", "JdbcFilterRule", "JdbcProjectRule" + }; + + + private final JdbcStorageConfig config; + private final DrillbitContext context; + private final DataSource source; + private final String name; + private final SqlDialect dialect; + private final DrillJdbcConvention convention; + + + public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) { + this.context = context; + this.config = config; + this.name = name; + BasicDataSource source = new BasicDataSource(); + source.setDriverClassName(config.getDriver()); + source.setUrl(config.getUrl()); + + if (config.getUsername() != null) { + source.setUsername(config.getUsername()); + } + + if (config.getPassword() != null) { + source.setPassword(config.getPassword()); + } + + this.source = source; + this.dialect = JdbcSchema.createDialect(source); + this.convention = new DrillJdbcConvention(dialect, name); + } + + + class DrillJdbcConvention extends JdbcConvention { + + private final ImmutableSet<RelOptRule> rules; + + public DrillJdbcConvention(SqlDialect dialect, String name) { + super(dialect, ConstantUntypedNull.INSTANCE, name); + + + // build rules for this convention. + ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder(); + + builder.add(JDBC_PRULE_INSTANCE); + builder.add(new JdbcDrelConverterRule(this)); + builder.add(new DrillJdbcProjectRule(this)); + builder.add(new DrillJdbcFilterRule(this)); + + outside: for (RelOptRule rule : JdbcRules.rules(this)) { + final String description = rule.toString(); + + // we want to black list some rules but the parent Calcite package is all or none. + // Therefore, we remove rules with names we don't like. + for(String black : RULES_TO_AVOID){ + if(description.equals(black)){ + continue outside; + } + + } + + builder.add(rule); + } + + builder.add(FilterSetOpTransposeRule.INSTANCE); + builder.add(ProjectRemoveRule.INSTANCE); + + rules = builder.build(); + } + + @Override + public void register(RelOptPlanner planner) { + for (RelOptRule rule : rules) { + planner.addRule(rule); + } + } + + public Set<RelOptRule> getRules() { + return rules; + } + + public JdbcStoragePlugin getPlugin() { + return JdbcStoragePlugin.this; + } + } + + /** + * Returns whether a condition is supported by {@link JdbcJoin}. + * + * <p>Corresponds to the capabilities of + * {@link JdbcJoin#convertConditionToSqlNode}. + * + * @param node Condition + * @return Whether condition is supported + */ + private static boolean canJoinOnCondition(RexNode node) { + final List<RexNode> operands; + switch (node.getKind()) { + case AND: + case OR: + operands = ((RexCall) node).getOperands(); + for (RexNode operand : operands) { + if (!canJoinOnCondition(operand)) { + return false; + } + } + return true; + + case EQUALS: + case IS_NOT_DISTINCT_FROM: + case NOT_EQUALS: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + operands = ((RexCall) node).getOperands(); + if ((operands.get(0) instanceof RexInputRef) + && (operands.get(1) instanceof RexInputRef)) { + return true; + } + // fall through + + default: + return false; + } + } + + + private static final JdbcPrule JDBC_PRULE_INSTANCE = new JdbcPrule(); + + private static class JdbcPrule extends ConverterRule { + + private JdbcPrule() { + super(JdbcDrel.class, DrillRel.DRILL_LOGICAL, Prel.DRILL_PHYSICAL, "JDBC_PREL_Converter"); + } + + @Override + public RelNode convert(RelNode in) { + + return new JdbcIntermediatePrel( + in.getCluster(), + in.getTraitSet().replace(getOutTrait()), + in.getInput(0)); + } + + } + + private class JdbcDrelConverterRule extends ConverterRule { + + public JdbcDrelConverterRule(DrillJdbcConvention in) { + super(RelNode.class, in, DrillRel.DRILL_LOGICAL, "JDBC_DREL_Converter" + in.getName()); + } + + @Override + public RelNode convert(RelNode in) { + return new JdbcDrel(in.getCluster(), in.getTraitSet().replace(DrillRel.DRILL_LOGICAL), + convert(in, in.getTraitSet().replace(this.getInTrait()))); + } + + } + + private class CapitalizingJdbcSchema extends AbstractSchema { + + private final JdbcSchema inner; + + public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource, + SqlDialect dialect, JdbcConvention convention, String catalog, String schema) { + super(parentSchemaPath, name); + inner = new JdbcSchema(dataSource, dialect, convention, catalog, schema); + } + + @Override + public String getTypeName() { + return JdbcStorageConfig.NAME; + } + + @Override + public Collection<Function> getFunctions(String name) { + return inner.getFunctions(name); + } + + @Override + public Set<String> getFunctionNames() { + return inner.getFunctionNames(); + } + + @Override + public Schema getSubSchema(String name) { + return inner.getSubSchema(name); + } + + @Override + public Set<String> getSubSchemaNames() { + return inner.getSubSchemaNames(); + } + + @Override + public Set<String> getTableNames() { + return inner.getTableNames(); + } + + @Override + public Table getTable(String name) { + Table table = inner.getTable(name); + if (table != null) { + return table; + } + return inner.getTable(name.toUpperCase()); + + } + + } + + private class JdbcCatalogSchema extends AbstractSchema { + + private final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap(); + private final CapitalizingJdbcSchema defaultSchema; + + public JdbcCatalogSchema(String name) { + super(ImmutableList.<String> of(), name); + + try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) { + while (set.next()) { + final String catalogName = set.getString(1); + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect, + convention, catalogName, null); + schemaMap.put(catalogName, schema); + } + } catch (SQLException e) { + logger.warn("Failure while attempting to load JDBC schema.", e); + } + + // unable to read general catalog + if (schemaMap.isEmpty()) { + schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect, + convention, + null, null)); + } + + defaultSchema = schemaMap.values().iterator().next(); + + } + + @Override + public String getTypeName() { + return JdbcStorageConfig.NAME; + } + + @Override + public Schema getDefaultSchema() { + return defaultSchema; + } + + @Override + public Schema getSubSchema(String name) { + return schemaMap.get(name); + } + + @Override + public Set<String> getSubSchemaNames() { + return schemaMap.keySet(); + } + + @Override + public Table getTable(String name) { + Schema schema = getDefaultSchema(); + if (schema != null) { + Table t = schema.getTable(name); + if (t != null) { + return t; + } + return schema.getTable(name.toUpperCase()); + } else { + return null; + } + } + + @Override + public Set<String> getTableNames() { + return defaultSchema.getTableNames(); + } + + } + + @Override + public void registerSchemas(SchemaConfig config, SchemaPlus parent) { + JdbcCatalogSchema schema = new JdbcCatalogSchema(name); + parent.add(name, schema); + } + + @Override + public JdbcStorageConfig getConfig() { + return config; + } + + public DrillbitContext getContext() { + return this.context; + } + + public String getName() { + return this.name; + } + + @Override + public boolean supportsRead() { + return true; + } + + public DataSource getSource() { + return source; + } + + public SqlDialect getDialect() { + return dialect; + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Set<RelOptRule> getOptimizerRules(OptimizerRulesContext context) { + return convention.getRules(); + } +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java new file mode 100755 index 000000000..fcafd4c27 --- /dev/null +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.jdbc; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractSubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("jdbc-sub-scan") +public class JdbcSubScan extends AbstractSubScan { + + private final String sql; + private final JdbcStoragePlugin plugin; + + @JsonCreator + public JdbcSubScan( + @JsonProperty("sql") String sql, + @JsonProperty("config") StoragePluginConfig config, + @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException { + super(""); + this.sql = sql; + this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config); + } + + JdbcSubScan(String sql, JdbcStoragePlugin plugin) { + super(""); + this.sql = sql; + this.plugin = plugin; + } + + @Override + public int getOperatorType() { + return -1; + } + + public String getSql() { + return sql; + } + + public StoragePluginConfig getConfig() { + return plugin.getConfig(); + } + + @JsonIgnore + public JdbcStoragePlugin getPlugin() { + return plugin; + } + +}
\ No newline at end of file diff --git a/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json new file mode 100755 index 000000000..7d8805266 --- /dev/null +++ b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json @@ -0,0 +1,10 @@ +{ + "storage":{ + "jdbc" : { + type:"jdbc", + enabled: false, + driver:"org.apache.derby.jdbc.ClientDriver", + url:"jdbc:derby://localhost:20000/memory:testDB;" + } + } +} diff --git a/contrib/storage-jdbc/src/main/resources/drill-module.conf b/contrib/storage-jdbc/src/main/resources/drill-module.conf new file mode 100755 index 000000000..721a59950 --- /dev/null +++ b/contrib/storage-jdbc/src/main/resources/drill-module.conf @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. +// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java new file mode 100644 index 000000000..1f150682b --- /dev/null +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java @@ -0,0 +1,181 @@ +package org.apache.drill.exec.store.jdbc; + +import static org.junit.Assert.assertEquals; + +import java.net.InetAddress; +import java.sql.Connection; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.derby.drda.NetworkServerControl; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +public class TestJdbcPlugin extends PlanTestBase { + + static NetworkServerControl server; + + @BeforeClass + public static void setupDefaultTestCluster() throws Exception { + System.setProperty("derby.drda.startNetworkServer", "true"); + server = new NetworkServerControl(InetAddress.getByName("localhost"), + 20000, + "admin", + "admin"); + java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true); + server.start(consoleWriter); + + BasicDataSource source = new BasicDataSource(); + source.setUrl("jdbc:derby://localhost:20000/memory:testDB;create=true"); + source.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); + + final String insertValues1 = "INSERT INTO person VALUES (1, 'Smith', null, '{number:\"123 Main\"}','mtrx', " + + "'xy', 333.333, 444.444, 555.00, TIME('15:09:02'), DATE('1994-02-23'), TIMESTAMP('1962-09-23 03:23:34.234')," + + " 666.66, 1, -1, false)"; + final String insertValues2 = "INSERT INTO person (PersonId) VALUES (null)"; + try (Connection c = source.getConnection()) { + c.createStatement().execute("CREATE TABLE person\n" + + "(\n" + + "PersonID int,\n" + + "LastName varchar(255),\n" + + "FirstName varchar(255),\n" + + "Address varchar(255),\n" + + "City varchar(255),\n" + + "Code char(2),\n" + + "dbl double,\n" + + "flt float,\n" + + "rel real,\n" + + "tm time,\n" + + "dt date,\n" + + "tms timestamp,\n" + + "num numeric(10,2), \n" + + "sm smallint,\n" + + "bi bigint,\n" + + "bool boolean\n" + + + ")"); + + c.createStatement().execute(insertValues1); + c.createStatement().execute(insertValues2); + c.createStatement().execute(insertValues1); + } + + BaseTestQuery.setupDefaultTestCluster(); + } + + @AfterClass + public static void shutdownDb() throws Exception { + server.shutdown(); + } + + @Test + public void validateResult() throws Exception { + // we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support. + testBuilder() + .sqlQuery( + "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON") + .ordered() + .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL", + "NUM", "SM", "BI", "BOOL") + .baselineValues(1, "Smith", null, "{number:\"123 Main\"}", "mtrx", "xy", 333.333, 444.444, 555.00, + 666.66, 1, -1l, false) + .baselineValues(null, null, null, null, null, null, null, null, null, null, null, null, null) + .baselineValues(1, "Smith", null, "{number:\"123 Main\"}", "mtrx", "xy", 333.333, 444.444, 555.00, + 666.66, 1, -1l, false) + .build().run(); + } + + @Test + public void queryDefaultSchema() throws Exception { + testNoResult("select * from testdb.PERSON"); + } + + @Test + public void queryDifferentCase() throws Exception { + testNoResult("select * from testdb.person"); + } + + @Test + public void pushdownJoin() throws Exception { + testNoResult("use testdb"); + String query = "select x.PersonId from (select PersonId from person)x " + + "join (select PersonId from person)y on x.PersonId = y.PersonId "; + testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join" }); + + } + + @Test + public void pushdownJoinAndFilterPushDown() throws Exception { + final String query = "select * from \n" + + "testdb.`default`.PERSON e\n" + + "INNER JOIN \n" + + "testdb.`default`.PERSON s\n" + + "ON e.FirstName = s.FirstName\n" + + "WHERE e.LastName > 'hello'"; + + testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); + } + + @Test + public void pushdownAggregation() throws Exception { + final String query = "select count(*) from \n" + + "testdb.`default`.PERSON"; + + testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" }); + } + + @Test + public void pushdownDoubleJoinAndFilter() throws Exception { + final String query = "select * from \n" + + "testdb.`default`.PERSON e\n" + + "INNER JOIN \n" + + "testdb.`default`.PERSON s\n" + + "ON e.PersonId = s.PersonId\n" + + "INNER JOIN \n" + + "testdb.`default`.PERSON ed\n" + + "ON e.PersonId = ed.PersonId\n" + + "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'"; + testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); + } + + @Test + public void showTablesDefaultSchema() throws Exception { + testNoResult("use testdb"); + assertEquals(1, testRunAndPrint(QueryType.SQL, "show tables like 'PERSON'")); + } + + @Test + public void describe() throws Exception { + testNoResult("use testdb"); + assertEquals(16, testRunAndPrint(QueryType.SQL, "describe PERSON")); + } + + @Test + public void ensureDrillFunctionsAreNotPushedDown() throws Exception { + // This should verify that we're not trying to push CONVERT_FROM into the JDBC storage plugin. If were pushing + // this function down, the SQL query would fail. + testNoResult("select CONVERT_FROM(Address, 'JSON') from testdb.person where PersonId = 1"); + } + + @Test + public void pushdownFilter() throws Exception { + testNoResult("use testdb"); + String query = "select * from person where PersonId = 1"; + testPlanMatchingPatterns(query, new String[] {}, new String[] { "Filter" }); + } +} diff --git a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json new file mode 100755 index 000000000..200ab93e4 --- /dev/null +++ b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json @@ -0,0 +1,10 @@ +{ + "storage":{ + testdb : { + type:"jdbc", + enabled: true, + driver:"org.apache.derby.jdbc.ClientDriver", + url:"jdbc:derby://localhost:20000/memory:testDB;" + } + } +} diff --git a/contrib/storage-jdbc/src/test/resources/logback.xml b/contrib/storage-jdbc/src/test/resources/logback.xml new file mode 100644 index 000000000..5facafe61 --- /dev/null +++ b/contrib/storage-jdbc/src/test/resources/logback.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + + <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender"> + <Compressing>true</Compressing> + <ReconnectionDelay>10000</ReconnectionDelay> + <IncludeCallerData>true</IncludeCallerData> + <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + <level value="warn" /> + </appender> + + <logger name="org.apache.drill" additivity="false"> + <level value="debug" /> + <appender-ref ref="SOCKET" /> +<!-- <appender-ref ref="STDOUT" /> --> + </logger> + + <root> + <level value="debug" /> + <appender-ref ref="SOCKET" /> +<!-- <appender-ref ref="STDOUT" /> --> + </root> + +</configuration> |