diff options
author | Jacques Nadeau <jacques@apache.org> | 2015-10-28 17:20:51 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-11-01 18:53:57 -0800 |
commit | 22e5316317cf5b8caed7a869e40a012f18652254 (patch) | |
tree | fe3d7a458d32507d6e1062009147cf5d5c55a8ae /contrib/storage-jdbc/src | |
parent | 7f55051a3144f0d2dfc317d426788f95606972d1 (diff) |
DRILL-3992: Add/fix support for JDBC schemas (tested against oracle and derby)
This closes #225
Diffstat (limited to 'contrib/storage-jdbc/src')
2 files changed, 78 insertions, 18 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java index f27f6f106..24d1f9d46 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java @@ -234,6 +234,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { private class CapitalizingJdbcSchema extends AbstractSchema { + final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap(); private final JdbcSchema inner; public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource, @@ -258,13 +259,21 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { } @Override - public Schema getSubSchema(String name) { - return inner.getSubSchema(name); + public CapitalizingJdbcSchema getSubSchema(String name) { + return schemaMap.get(name); + } + + void setHolder(SchemaPlus plusOfThis) { + for (String s : getSubSchemaNames()) { + CapitalizingJdbcSchema inner = getSubSchema(s); + SchemaPlus holder = plusOfThis.add(s, inner); + inner.setHolder(holder); + } } @Override public Set<String> getSubSchemaNames() { - return inner.getSubSchemaNames(); + return schemaMap.keySet(); } @Override @@ -295,25 +304,74 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) { while (set.next()) { final String catalogName = set.getString(1); - CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect, - convention, catalogName, null); + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema( + getSchemaPath(), catalogName, source, dialect, convention, catalogName, null); schemaMap.put(catalogName, schema); } } catch (SQLException e) { logger.warn("Failure while attempting to load JDBC schema.", e); } - // unable to read general catalog + // unable to read catalog list. if (schemaMap.isEmpty()) { - schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect, - convention, - null, null)); + + // try to add a list of schemas to the schema map. + boolean schemasAdded = addSchemas(); + + if (!schemasAdded) { + // there were no schemas, just create a default one (the jdbc system doesn't support catalogs/schemas). + schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect, + convention, null, null)); + } + } else { + // We already have catalogs. Add schemas in this context of their catalogs. + addSchemas(); } defaultSchema = schemaMap.values().iterator().next(); + + } + + void setHolder(SchemaPlus plusOfThis) { + for (String s : getSubSchemaNames()) { + CapitalizingJdbcSchema inner = getSubSchema(s); + SchemaPlus holder = plusOfThis.add(s, inner); + inner.setHolder(holder); + } } + private boolean addSchemas() { + boolean added = false; + try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getSchemas()) { + while (set.next()) { + final String schemaName = set.getString(1); + final String catalogName = set.getString(2); + + CapitalizingJdbcSchema parentSchema = schemaMap.get(catalogName); + if (parentSchema == null) { + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), schemaName, source, dialect, + convention, catalogName, schemaName); + + // if a catalog schema doesn't exist, we'll add this at the top level. + schemaMap.put(schemaName, schema); + } else { + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(parentSchema.getSchemaPath(), schemaName, + source, dialect, + convention, catalogName, schemaName); + parentSchema.schemaMap.put(schemaName, schema); + + } + added = true; + } + } catch (SQLException e) { + logger.warn("Failure while attempting to load JDBC schema.", e); + } + + return added; + } + + @Override public String getTypeName() { return JdbcStorageConfig.NAME; @@ -325,7 +383,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { } @Override - public Schema getSubSchema(String name) { + public CapitalizingJdbcSchema getSubSchema(String name) { return schemaMap.get(name); } @@ -358,9 +416,11 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { @Override public void registerSchemas(SchemaConfig config, SchemaPlus parent) { JdbcCatalogSchema schema = new JdbcCatalogSchema(name); - parent.add(name, schema); + SchemaPlus holder = parent.add(name, schema); + schema.setHolder(holder); } + @Override public JdbcStorageConfig getConfig() { return config; diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java index 1f150682b..2eb419cda 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java @@ -88,7 +88,7 @@ public class TestJdbcPlugin extends PlanTestBase { // we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support. testBuilder() .sqlQuery( - "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON") + "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.PERSON") .ordered() .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL", "NUM", "SM", "BI", "BOOL") @@ -122,9 +122,9 @@ public class TestJdbcPlugin extends PlanTestBase { @Test public void pushdownJoinAndFilterPushDown() throws Exception { final String query = "select * from \n" + - "testdb.`default`.PERSON e\n" + + "testdb.PERSON e\n" + "INNER JOIN \n" + - "testdb.`default`.PERSON s\n" + + "testdb.PERSON s\n" + "ON e.FirstName = s.FirstName\n" + "WHERE e.LastName > 'hello'"; @@ -134,7 +134,7 @@ public class TestJdbcPlugin extends PlanTestBase { @Test public void pushdownAggregation() throws Exception { final String query = "select count(*) from \n" + - "testdb.`default`.PERSON"; + "testdb.PERSON"; testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" }); } @@ -142,12 +142,12 @@ public class TestJdbcPlugin extends PlanTestBase { @Test public void pushdownDoubleJoinAndFilter() throws Exception { final String query = "select * from \n" + - "testdb.`default`.PERSON e\n" + + "testdb.PERSON e\n" + "INNER JOIN \n" + - "testdb.`default`.PERSON s\n" + + "testdb.PERSON s\n" + "ON e.PersonId = s.PersonId\n" + "INNER JOIN \n" + - "testdb.`default`.PERSON ed\n" + + "testdb.PERSON ed\n" + "ON e.PersonId = ed.PersonId\n" + "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'"; testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); |