diff options
author | Prasad Nagaraj Subramanya <prasadns14@gmail.com> | 2017-10-06 21:17:57 -0700 |
---|---|---|
committer | Paul Rogers <progers@maprtech.com> | 2017-10-16 12:09:38 -0700 |
commit | e90d96a5240ab2d1c7200bd8f4caa54e9b9cba45 (patch) | |
tree | e2d725fa2f2c6d0b5e0a1d5f6971a495d6030dff /contrib/storage-hbase/src | |
parent | bad6c1c991a18fe360c5ecff1704692c76438542 (diff) |
DRILL-5743: Handling column family and column scan for hbase
closes #975
Diffstat (limited to 'contrib/storage-hbase/src')
4 files changed, 156 insertions, 6 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index cae7ce412..4e822dffd 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -94,9 +94,21 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas setColumns(projectedColumns); } + /** + * Provides the projected columns information to the Hbase Scan instance. If the + * projected columns list contains a column family and also a column in the + * column family, only the column family is passed to the Scan instance. + * + * For example, if the projection list is {cf1, cf1.col1, cf2.col1} then we only + * pass {cf1, cf2.col1} to the Scan instance. + * + * @param columns collection of projected columns + * @return collection of projected column family names + */ @Override protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { Set<SchemaPath> transformed = Sets.newLinkedHashSet(); + Set<String> completeFamilies = Sets.newHashSet(); rowKeyOnly = true; if (!isStarQuery()) { for (SchemaPath column : columns) { @@ -109,11 +121,14 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas byte[] family = root.getPath().getBytes(); transformed.add(SchemaPath.getSimplePath(root.getPath())); PathSegment child = root.getChild(); - if (child != null && child.isNamed()) { - byte[] qualifier = child.getNameSegment().getPath().getBytes(); - hbaseScan.addColumn(family, qualifier); - } else { - hbaseScan.addFamily(family); + if (!completeFamilies.contains(new String(family, StandardCharsets.UTF_8).toLowerCase())) { + if (child != null && child.isNamed()) { + byte[] qualifier = child.getNameSegment().getPath().getBytes(); + hbaseScan.addColumn(family, qualifier); + } else { + hbaseScan.addFamily(family); + completeFamilies.add(new String(family, StandardCharsets.UTF_8).toLowerCase()); + } } } /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java index 51038a871..5d94d248f 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java @@ -22,6 +22,8 @@ import org.apache.drill.categories.SlowTest; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.apache.drill.TestBuilder.mapOf; + @Category({SlowTest.class, HbaseStorageTest.class}) public class HBaseRecordReaderTest extends BaseHBaseTest { @@ -43,4 +45,52 @@ public class HBaseRecordReaderTest extends BaseHBaseTest { runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 4); } + @Test + public void testOrderBy() throws Exception { + testBuilder() + .sqlQuery("select cast(row_key AS VARCHAR) as row_key, t.f from hbase.`TestTable2` t order by t.f.c1") + .ordered() + .baselineColumns("row_key", "f") + .baselineValues( + "a2", + mapOf( + "c1", "11".getBytes(), + "c2", "12".getBytes(), + "c3", "13".getBytes())) + .baselineValues( + "a1", + mapOf( + "c1", "21".getBytes(), + "c2", "22".getBytes(), + "c3", "23".getBytes())) + .baselineValues( + "a3", + mapOf( + "c1", "31".getBytes(), + "c2", "32".getBytes(), + "c3", "33".getBytes())) + .go(); + } + + @Test + public void testMultiCFDifferentCase() throws Exception { + testBuilder() + .sqlQuery("select * from hbase.`TestTableMultiCF` t") + .unOrdered() + .baselineColumns("row_key", "F", "f0") + .baselineValues( + "a1".getBytes(), + mapOf("c3", "23".getBytes()), + mapOf("c3", "23".getBytes())) + .baselineValues( + "a2".getBytes(), + mapOf("c3", "13".getBytes()), + mapOf("c3", "13".getBytes())) + .baselineValues( + "a3".getBytes(), + mapOf("c3", "33".getBytes()), + mapOf("c3", "33".getBytes())) + .go(); + } + } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 1a176ca3b..ef7013149 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -62,7 +62,9 @@ public class HBaseTestsSuite { private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0; protected static final TableName TEST_TABLE_1 = TableName.valueOf("TestTable1"); + protected static final TableName TEST_TABLE_2 = TableName.valueOf("TestTable2"); protected static final TableName TEST_TABLE_3 = TableName.valueOf("TestTable3"); + protected static final TableName TEST_TABLE_MULTI_CF_DIFFERENT_CASE = TableName.valueOf("TestTableMultiCF"); protected static final TableName TEST_TABLE_COMPOSITE_DATE = TableName.valueOf("TestTableCompositeDate"); protected static final TableName TEST_TABLE_COMPOSITE_TIME = TableName.valueOf("TestTableCompositeTime"); protected static final TableName TEST_TABLE_COMPOSITE_INT = TableName.valueOf("TestTableCompositeInt"); @@ -164,7 +166,9 @@ public class HBaseTestsSuite { } private static boolean tablesExist() throws IOException { - return admin.tableExists(TEST_TABLE_1) && admin.tableExists(TEST_TABLE_3) + return admin.tableExists(TEST_TABLE_1) && admin.tableExists(TEST_TABLE_2) + && admin.tableExists(TEST_TABLE_3) + && admin.tableExists(TEST_TABLE_MULTI_CF_DIFFERENT_CASE) && admin.tableExists(TEST_TABLE_COMPOSITE_DATE) && admin.tableExists(TEST_TABLE_COMPOSITE_TIME) && admin.tableExists(TEST_TABLE_COMPOSITE_INT) @@ -188,7 +192,9 @@ public class HBaseTestsSuite { * Will revert to multiple region once the issue is resolved. */ TestTableGenerator.generateHBaseDataset1(conn, admin, TEST_TABLE_1, 2); + TestTableGenerator.generateHBaseDatasetSingleSchema(conn, admin, TEST_TABLE_2, 1); TestTableGenerator.generateHBaseDataset3(conn, admin, TEST_TABLE_3, 1); + TestTableGenerator.generateHBaseDatasetMultiCF(conn, admin, TEST_TABLE_MULTI_CF_DIFFERENT_CASE, 1); TestTableGenerator.generateHBaseDatasetCompositeKeyDate(conn, admin, TEST_TABLE_COMPOSITE_DATE, 1); TestTableGenerator.generateHBaseDatasetCompositeKeyTime(conn, admin, TEST_TABLE_COMPOSITE_TIME, 1); TestTableGenerator.generateHBaseDatasetCompositeKeyInt(conn, admin, TEST_TABLE_COMPOSITE_INT, 1); @@ -206,8 +212,12 @@ public class HBaseTestsSuite { private static void cleanupTestTables() throws IOException { admin.disableTable(TEST_TABLE_1); admin.deleteTable(TEST_TABLE_1); + admin.disableTable(TEST_TABLE_2); + admin.deleteTable(TEST_TABLE_2); admin.disableTable(TEST_TABLE_3); admin.deleteTable(TEST_TABLE_3); + admin.disableTable(TEST_TABLE_MULTI_CF_DIFFERENT_CASE); + admin.deleteTable(TEST_TABLE_MULTI_CF_DIFFERENT_CASE); admin.disableTable(TEST_TABLE_COMPOSITE_DATE); admin.deleteTable(TEST_TABLE_COMPOSITE_DATE); admin.disableTable(TEST_TABLE_COMPOSITE_TIME); diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java index 73df7e412..e69f4e227 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java @@ -133,6 +133,81 @@ public class TestTableGenerator { table.close(); } + public static void generateHBaseDatasetSingleSchema(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + if (numberRegions > 1) { + admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions - 1)); + } else { + admin.createTable(desc); + } + + BufferedMutator table = conn.getBufferedMutator(tableName); + + Put p = new Put("a1".getBytes()); + p.addColumn("f".getBytes(), "c1".getBytes(), "21".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "22".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "23".getBytes()); + table.mutate(p); + + p = new Put("a2".getBytes()); + p.addColumn("f".getBytes(), "c1".getBytes(), "11".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "12".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "13".getBytes()); + table.mutate(p); + + p = new Put("a3".getBytes()); + p.addColumn("f".getBytes(), "c1".getBytes(), "31".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "32".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "33".getBytes()); + table.mutate(p); + + table.close(); + } + + public static void generateHBaseDatasetMultiCF(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + desc.addFamily(new HColumnDescriptor("F")); + if (numberRegions > 1) { + admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions - 1)); + } else { + admin.createTable(desc); + } + + BufferedMutator table = conn.getBufferedMutator(tableName); + + Put p = new Put("a1".getBytes()); + p.addColumn("f".getBytes(), "c1".getBytes(), "21".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "22".getBytes()); + p.addColumn("F".getBytes(), "c3".getBytes(), "23".getBytes()); + table.mutate(p); + + p = new Put("a2".getBytes()); + p.addColumn("f".getBytes(), "c1".getBytes(), "11".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "12".getBytes()); + p.addColumn("F".getBytes(), "c3".getBytes(), "13".getBytes()); + table.mutate(p); + + p = new Put("a3".getBytes()); + p.addColumn("f".getBytes(), "c1".getBytes(), "31".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "32".getBytes()); + p.addColumn("F".getBytes(), "c3".getBytes(), "33".getBytes()); + table.mutate(p); + + table.close(); + } + public static void generateHBaseDataset2(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); |