aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-jdbc/src
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2015-10-28 17:20:51 -0700
committerJacques Nadeau <jacques@apache.org>2015-11-01 18:53:57 -0800
commit22e5316317cf5b8caed7a869e40a012f18652254 (patch)
treefe3d7a458d32507d6e1062009147cf5d5c55a8ae /contrib/storage-jdbc/src
parent7f55051a3144f0d2dfc317d426788f95606972d1 (diff)
DRILL-3992: Add/fix support for JDBC schemas (tested against oracle and derby)
This closes #225
Diffstat (limited to 'contrib/storage-jdbc/src')
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java82
-rw-r--r--contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java14
2 files changed, 78 insertions, 18 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index f27f6f106..24d1f9d46 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -234,6 +234,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
private class CapitalizingJdbcSchema extends AbstractSchema {
+ final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap();
private final JdbcSchema inner;
public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource,
@@ -258,13 +259,21 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Schema getSubSchema(String name) {
- return inner.getSubSchema(name);
+ public CapitalizingJdbcSchema getSubSchema(String name) {
+ return schemaMap.get(name);
+ }
+
+ void setHolder(SchemaPlus plusOfThis) {
+ for (String s : getSubSchemaNames()) {
+ CapitalizingJdbcSchema inner = getSubSchema(s);
+ SchemaPlus holder = plusOfThis.add(s, inner);
+ inner.setHolder(holder);
+ }
}
@Override
public Set<String> getSubSchemaNames() {
- return inner.getSubSchemaNames();
+ return schemaMap.keySet();
}
@Override
@@ -295,25 +304,74 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) {
while (set.next()) {
final String catalogName = set.getString(1);
- CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect,
- convention, catalogName, null);
+ CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
+ getSchemaPath(), catalogName, source, dialect, convention, catalogName, null);
schemaMap.put(catalogName, schema);
}
} catch (SQLException e) {
logger.warn("Failure while attempting to load JDBC schema.", e);
}
- // unable to read general catalog
+ // unable to read catalog list.
if (schemaMap.isEmpty()) {
- schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect,
- convention,
- null, null));
+
+ // try to add a list of schemas to the schema map.
+ boolean schemasAdded = addSchemas();
+
+ if (!schemasAdded) {
+ // there were no schemas, just create a default one (the jdbc system doesn't support catalogs/schemas).
+ schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect,
+ convention, null, null));
+ }
+ } else {
+ // We already have catalogs. Add schemas in this context of their catalogs.
+ addSchemas();
}
defaultSchema = schemaMap.values().iterator().next();
+
+ }
+
+ void setHolder(SchemaPlus plusOfThis) {
+ for (String s : getSubSchemaNames()) {
+ CapitalizingJdbcSchema inner = getSubSchema(s);
+ SchemaPlus holder = plusOfThis.add(s, inner);
+ inner.setHolder(holder);
+ }
}
+ private boolean addSchemas() {
+ boolean added = false;
+ try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getSchemas()) {
+ while (set.next()) {
+ final String schemaName = set.getString(1);
+ final String catalogName = set.getString(2);
+
+ CapitalizingJdbcSchema parentSchema = schemaMap.get(catalogName);
+ if (parentSchema == null) {
+ CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), schemaName, source, dialect,
+ convention, catalogName, schemaName);
+
+ // if a catalog schema doesn't exist, we'll add this at the top level.
+ schemaMap.put(schemaName, schema);
+ } else {
+ CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(parentSchema.getSchemaPath(), schemaName,
+ source, dialect,
+ convention, catalogName, schemaName);
+ parentSchema.schemaMap.put(schemaName, schema);
+
+ }
+ added = true;
+ }
+ } catch (SQLException e) {
+ logger.warn("Failure while attempting to load JDBC schema.", e);
+ }
+
+ return added;
+ }
+
+
@Override
public String getTypeName() {
return JdbcStorageConfig.NAME;
@@ -325,7 +383,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Schema getSubSchema(String name) {
+ public CapitalizingJdbcSchema getSubSchema(String name) {
return schemaMap.get(name);
}
@@ -358,9 +416,11 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
@Override
public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
JdbcCatalogSchema schema = new JdbcCatalogSchema(name);
- parent.add(name, schema);
+ SchemaPlus holder = parent.add(name, schema);
+ schema.setHolder(holder);
}
+
@Override
public JdbcStorageConfig getConfig() {
return config;
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
index 1f150682b..2eb419cda 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
@@ -88,7 +88,7 @@ public class TestJdbcPlugin extends PlanTestBase {
// we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support.
testBuilder()
.sqlQuery(
- "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON")
+ "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.PERSON")
.ordered()
.baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL",
"NUM", "SM", "BI", "BOOL")
@@ -122,9 +122,9 @@ public class TestJdbcPlugin extends PlanTestBase {
@Test
public void pushdownJoinAndFilterPushDown() throws Exception {
final String query = "select * from \n" +
- "testdb.`default`.PERSON e\n" +
+ "testdb.PERSON e\n" +
"INNER JOIN \n" +
- "testdb.`default`.PERSON s\n" +
+ "testdb.PERSON s\n" +
"ON e.FirstName = s.FirstName\n" +
"WHERE e.LastName > 'hello'";
@@ -134,7 +134,7 @@ public class TestJdbcPlugin extends PlanTestBase {
@Test
public void pushdownAggregation() throws Exception {
final String query = "select count(*) from \n" +
- "testdb.`default`.PERSON";
+ "testdb.PERSON";
testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" });
}
@@ -142,12 +142,12 @@ public class TestJdbcPlugin extends PlanTestBase {
@Test
public void pushdownDoubleJoinAndFilter() throws Exception {
final String query = "select * from \n" +
- "testdb.`default`.PERSON e\n" +
+ "testdb.PERSON e\n" +
"INNER JOIN \n" +
- "testdb.`default`.PERSON s\n" +
+ "testdb.PERSON s\n" +
"ON e.PersonId = s.PersonId\n" +
"INNER JOIN \n" +
- "testdb.`default`.PERSON ed\n" +
+ "testdb.PERSON ed\n" +
"ON e.PersonId = ed.PersonId\n" +
"WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'";
testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });