aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-hbase/src
diff options
context:
space:
mode:
authorPrasad Nagaraj Subramanya <prasadns14@gmail.com>2017-10-25 21:20:07 -0700
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2017-11-13 11:06:20 +0200
commitdfd43d020498c09dcb2c3fed4e8c6df23d755d55 (patch)
treede66e57e34c6f968f8aff94b605d9f7d489c365a /contrib/storage-hbase/src
parent59c7447262a22f7f1099f1e0f6d33d44acf8813f (diff)
DRILL-5896: Handle HBase columns vector creation in the HBaseRecordReader
closes #1005
Diffstat (limited to 'contrib/storage-hbase/src')
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java34
1 files changed, 23 insertions, 11 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 4e822dffd..631c44dec 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -75,6 +75,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
private TableName hbaseTableName;
private Scan hbaseScan;
+ // scan instance to capture columns for vector creation
+ private Scan hbaseScanColumnsOnly;
+ private Set<String> completeFamilies;
private OperatorContext operatorContext;
private boolean rowKeyOnly;
@@ -87,6 +90,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
hbaseTableName = TableName.valueOf(
Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName());
hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
+ hbaseScanColumnsOnly = new Scan();
hbaseScan
.setFilter(subScanSpec.getScanFilter())
.setCaching(TARGET_RECORD_COUNT);
@@ -108,7 +112,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
Set<SchemaPath> transformed = Sets.newLinkedHashSet();
- Set<String> completeFamilies = Sets.newHashSet();
+ completeFamilies = Sets.newHashSet();
+
rowKeyOnly = true;
if (!isStarQuery()) {
for (SchemaPath column : columns) {
@@ -121,16 +126,18 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
byte[] family = root.getPath().getBytes();
transformed.add(SchemaPath.getSimplePath(root.getPath()));
PathSegment child = root.getChild();
- if (!completeFamilies.contains(new String(family, StandardCharsets.UTF_8).toLowerCase())) {
- if (child != null && child.isNamed()) {
- byte[] qualifier = child.getNameSegment().getPath().getBytes();
+ if (child != null && child.isNamed()) {
+ byte[] qualifier = child.getNameSegment().getPath().getBytes();
+ hbaseScanColumnsOnly.addColumn(family, qualifier);
+ if (!completeFamilies.contains(root.getPath())) {
hbaseScan.addColumn(family, qualifier);
- } else {
- hbaseScan.addFamily(family);
- completeFamilies.add(new String(family, StandardCharsets.UTF_8).toLowerCase());
}
+ } else {
+ hbaseScan.addFamily(family);
+ completeFamilies.add(root.getPath());
}
}
+
/* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
* to fetch only one KV from each row. If a filter is already part of this
* scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL
@@ -168,11 +175,10 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
}
}
- // Add map and child vectors for any HBase column families and/or HBase
- // columns that are requested (in order to avoid later creation of dummy
- // NullableIntVectors for them).
+ // Add map and child vectors for any HBase columns that are requested (in
+ // order to avoid later creation of dummy NullableIntVectors for them).
final Set<Map.Entry<byte[], NavigableSet<byte []>>> familiesEntries =
- hbaseScan.getFamilyMap().entrySet();
+ hbaseScanColumnsOnly.getFamilyMap().entrySet();
for (Map.Entry<byte[], NavigableSet<byte []>> familyEntry : familiesEntries) {
final String familyName = new String(familyEntry.getKey(),
StandardCharsets.UTF_8);
@@ -186,6 +192,12 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
}
}
}
+
+ // Add map vectors for any HBase column families that are requested.
+ for (String familyName : completeFamilies) {
+ getOrCreateFamilyVector(familyName, false);
+ }
+
resultScanner = hTable.getScanner(hbaseScan);
} catch (SchemaChangeException | IOException e) {
throw new ExecutionSetupException(e);