diff options
author | Volodymyr Vysotskyi <vvovyk@gmail.com> | 2019-02-17 23:10:04 +0200 |
---|---|---|
committer | Gautam Parai <gparai@apache.org> | 2019-02-22 23:33:03 -0800 |
commit | 110c3578aa18f596278e251d700c8fa9ada8b0c4 (patch) | |
tree | f36d4aa11d5faf862b8d276b7651a8d181dcb304 /contrib | |
parent | a43839e2147c24700f8a331c6863566abed7a51e (diff) |
DRILL-6734: JDBC storage plugin returns null for fields without aliases
closes #1642
- Add output column names to JdbcRecordReader and use them for storing the results since column names in result set may differ when aliases aren't specified
Diffstat (limited to 'contrib')
7 files changed, 125 insertions, 41 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java index 76c06e71e..9073f4dd8 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java @@ -35,7 +35,8 @@ public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> { List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); JdbcStoragePlugin plugin = config.getPlugin(); - RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName()); + RecordReader reader = new JdbcRecordReader(plugin.getSource(), + config.getSql(), plugin.getName(), config.getColumns()); return new ScanBatch(config, context, Collections.singletonList(reader)); } } diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java index ed6b674c4..a98193939 100644 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; @@ -39,35 +38,39 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class JdbcGroupScan extends AbstractGroupScan { private final String sql; + private final List<String> columns; private final JdbcStoragePlugin plugin; private final double rows; @JsonCreator public JdbcGroupScan( @JsonProperty("sql") String sql, + @JsonProperty("columns") List<String> columns, @JsonProperty("config") StoragePluginConfig config, @JsonProperty("rows") double rows, @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException { super(""); this.sql = sql; + this.columns = columns; this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config); this.rows = rows; } - JdbcGroupScan(String sql, JdbcStoragePlugin plugin, double rows) { + JdbcGroupScan(String sql, List<String> columns, JdbcStoragePlugin plugin, double rows) { super(""); this.sql = sql; + this.columns = columns; this.plugin = plugin; this.rows = rows; } @Override - public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { + public void applyAssignments(List<DrillbitEndpoint> endpoints) { } @Override - public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { - return new JdbcSubScan(sql, plugin); + public SubScan getSpecificScan(int minorFragmentId) { + return new JdbcSubScan(sql, columns, plugin); } @Override @@ -88,9 +91,13 @@ public class JdbcGroupScan extends AbstractGroupScan { return sql; } + public List<String> getColumns() { + return columns; + } + @Override public String getDigest() { - return sql + String.valueOf(plugin.getConfig()); + return sql + plugin.getConfig(); } public StoragePluginConfig getConfig() { @@ -98,10 +105,7 @@ public class JdbcGroupScan extends AbstractGroupScan { } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { - return new JdbcGroupScan(sql, plugin, rows); + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + return new JdbcGroupScan(sql, columns, plugin, rows); } - - - -}
\ No newline at end of file +} diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java index 8221367e8..b8229402b 100644 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.jdbc; -import java.io.IOException; import java.util.Collections; import java.util.Iterator; @@ -91,8 +90,8 @@ public class JdbcPrel extends AbstractRelNode implements Prel { } @Override - public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - JdbcGroupScan output = new JdbcGroupScan(sql, convention.getPlugin(), rows); + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) { + JdbcGroupScan output = new JdbcGroupScan(sql, rowType.getFieldNames(), convention.getPlugin(), rows); return creator.addMetadata(this, output); } diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java index cd732a61c..011c9bc58 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -28,12 +28,12 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.util.Calendar; +import java.util.List; import java.util.TimeZone; import javax.sql.DataSource; import org.apache.drill.common.AutoCloseables; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -63,7 +63,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; @SuppressWarnings("unchecked") class JdbcRecordReader extends AbstractRecordReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory .getLogger(JdbcRecordReader.class); private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS; @@ -75,11 +75,13 @@ class JdbcRecordReader extends AbstractRecordReader { private final String sql; private ImmutableList<ValueVector> vectors; private ImmutableList<Copier<?>> copiers; + private final List<String> columns; - public JdbcRecordReader(DataSource source, String sql, String storagePluginName) { + public JdbcRecordReader(DataSource source, String sql, String storagePluginName, List<String> columns) { this.source = source; this.sql = sql; this.storagePluginName = storagePluginName; + this.columns = columns; } static { @@ -180,22 +182,35 @@ class JdbcRecordReader extends AbstractRecordReader { } @Override - public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { + public void setup(OperatorContext operatorContext, OutputMutator output) { try { connection = source.getConnection(); statement = connection.createStatement(); resultSet = statement.executeQuery(sql); - final ResultSetMetaData meta = resultSet.getMetaData(); - final int columns = meta.getColumnCount(); + ResultSetMetaData meta = resultSet.getMetaData(); + int columnsCount = meta.getColumnCount(); + if (columns.size() != columnsCount) { + throw UserException + .validationError() + .message( + "Expected columns count differs from the returned one.\n" + + "Expected columns: %s\n" + + "Returned columns count: %s", + columns, columnsCount) + .addContext("sql", sql) + .addContext("plugin", storagePluginName) + .build(logger); + } ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder(); ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder(); - for (int i = 1; i <= columns; i++) { - final String name = meta.getColumnLabel(i); - final int jdbcType = meta.getColumnType(i); - final int width = meta.getPrecision(i); - final int scale = meta.getScale(i); + for (int i = 1; i <= columnsCount; i++) { + String name = columns.get(i - 1); + // column index in ResultSetMetaData starts from 1 + int jdbcType = meta.getColumnType(i); + int width = meta.getPrecision(i); + int scale = meta.getScale(i); MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType); if (minorType == null) { @@ -214,14 +229,14 @@ class JdbcRecordReader extends AbstractRecordReader { continue; } - final MajorType type = MajorType.newBuilder() + MajorType type = MajorType.newBuilder() .setMode(TypeProtos.DataMode.OPTIONAL) .setMinorType(minorType) .setScale(scale) .setPrecision(width) .build(); - final MaterializedField field = MaterializedField.create(name, type); - final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( + MaterializedField field = MaterializedField.create(name, type); + Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( minorType, type.getMode()); ValueVector vector = output.addField(field, clazz); vectorBuilder.add(vector); @@ -245,13 +260,11 @@ class JdbcRecordReader extends AbstractRecordReader { @Override public int next() { int counter = 0; - Boolean b = true; try { - while (counter < 4095 && b) { // loop at 4095 since nullables use one more than record count and we + while (counter < 4095) { // loop at 4095 since nullables use one more than record count and we // allocate on powers of two. - b = resultSet.next(); - if (!b) { - break; + if (!resultSet.next()) { + break; } for (Copier<?> c : copiers) { c.copy(counter); diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java index 34d18273e..9bc6de891 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java @@ -29,25 +29,31 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import java.util.List; + @JsonTypeName("jdbc-sub-scan") public class JdbcSubScan extends AbstractSubScan { private final String sql; private final JdbcStoragePlugin plugin; + private final List<String> columns; @JsonCreator public JdbcSubScan( @JsonProperty("sql") String sql, + @JsonProperty("columns") List<String> columns, @JsonProperty("config") StoragePluginConfig config, @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException { super(""); this.sql = sql; + this.columns = columns; this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config); } - JdbcSubScan(String sql, JdbcStoragePlugin plugin) { + JdbcSubScan(String sql, List<String> columns, JdbcStoragePlugin plugin) { super(""); this.sql = sql; + this.columns = columns; this.plugin = plugin; } @@ -60,6 +66,10 @@ public class JdbcSubScan extends AbstractSubScan { return sql; } + public List<String> getColumns() { + return columns; + } + public StoragePluginConfig getConfig() { return plugin.getConfig(); } @@ -68,5 +78,4 @@ public class JdbcSubScan extends AbstractSubScan { public JdbcStoragePlugin getPlugin() { return plugin; } - -}
\ No newline at end of file +} diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java index a40eec2fa..1da9cf001 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java @@ -63,7 +63,9 @@ public class TestJdbcPluginWithH2IT extends ClusterTest { URL scriptFile = TestJdbcPluginWithH2IT.class.getClassLoader().getResource("h2-test-data.sql"); Assert.assertNotNull("Script for test tables generation 'h2-test-data.sql' " + "cannot be found in test resources", scriptFile); - RunScript.execute(connection, new FileReader(scriptFile.getFile())); + try (FileReader fileReader = new FileReader(scriptFile.getFile())) { + RunScript.execute(connection, fileReader); + } } startCluster(ClusterFixture.builder(dirTestWatcher)); diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java index 447e76a38..049ee6022 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java @@ -189,7 +189,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest { null, null, null, "XXX") - .go(); + .go(); } @Test @@ -204,7 +204,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest { @Test public void pushdownJoinAndFilterPushDown() throws Exception { - final String query = "select * from " + + String query = "select * from " + "mysql.`drill_mysql_test`.person e " + "INNER JOIN " + "mysql.`drill_mysql_test`.person s " + @@ -252,4 +252,60 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest { assertEquals(1, queryBuilder().sql("describe caseSensitiveTable").run().recordCount()); assertEquals(2, queryBuilder().sql("describe CASESENSITIVETABLE").run().recordCount()); } + + @Test // DRILL-6734 + public void testExpressionsWithoutAlias() throws Exception { + String query = "select count(*), 1+1+2+3+5+8+13+21+34, (1+sqrt(5))/2\n" + + "from mysql.`drill_mysql_test`.person"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2") + .baselineValues(4L, 88L, 1.618033988749895) + .go(); + } + + @Test // DRILL-6734 + public void testExpressionsWithoutAliasesPermutations() throws Exception { + String query = "select EXPR$1, EXPR$0, EXPR$2\n" + + "from (select 1+1+2+3+5+8+13+21+34, (1+sqrt(5))/2, count(*) from mysql.`drill_mysql_test`.person)"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("EXPR$1", "EXPR$0", "EXPR$2") + .baselineValues(1.618033988749895, 88L, 4L) + .go(); + } + + @Test // DRILL-6734 + public void testExpressionsWithAliases() throws Exception { + String query = "select person_id as ID, 1+1+2+3+5+8+13+21+34 as FIBONACCI_SUM, (1+sqrt(5))/2 as golden_ratio\n" + + "from mysql.`drill_mysql_test`.person limit 2"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("ID", "FIBONACCI_SUM", "golden_ratio") + .baselineValues(1, 88L, 1.618033988749895) + .baselineValues(2, 88L, 1.618033988749895) + .go(); + } + + @Test // DRILL-6893 + public void testJoinStar() throws Exception { + String query = "select * from (select person_id from mysql.`drill_mysql_test`.person) t1 join " + + "(select person_id from mysql.`drill_mysql_test`.person) t2 on t1.person_id = t2.person_id"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("person_id", "person_id0") + .baselineValues(1, 1) + .baselineValues(2, 2) + .baselineValues(3, 3) + .baselineValues(5, 5) + .go(); + } } |