diff options
8 files changed, 195 insertions, 75 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java index 4a5d118ce..f16a2ce53 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,11 +18,21 @@ package org.apache.drill.exec.store.mapr.db; import java.io.IOException; +import java.util.List; +import com.mapr.fs.tables.TableProperties; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.mapr.TableFormatMatcher; import org.apache.drill.exec.store.mapr.TableFormatPlugin; import com.mapr.fs.MapRFileStatus; +import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable; +import org.apache.hadoop.fs.Path; public class MapRDBFormatMatcher extends TableFormatMatcher { @@ -39,4 +49,26 @@ public class MapRDBFormatMatcher extends TableFormatMatcher { .getIsMarlinTable(); } + @Override + public DrillTable isReadable(DrillFileSystem fs, + FileSelection selection, FileSystemPlugin fsPlugin, + String storageEngineName, String userName) throws IOException { + + if (isFileReadable(fs, selection.getFirstPath(fs))) { + List<String> files = selection.getFiles(); + assert (files.size() == 1); + String tableName = files.get(0); + TableProperties props = getFormatPlugin().getMaprFS().getTableProperties(new Path(tableName)); + + if (props.getAttr().getJson()) { + return new DynamicDrillTable(fsPlugin, storageEngineName, userName, + new FormatSelection(getFormatPlugin().getConfig(), selection)); + } else { + FormatSelection formatSelection = new FormatSelection(getFormatPlugin().getConfig(), selection); + return new MapRDBBinaryTable(storageEngineName, fsPlugin, (MapRDBFormatPlugin) getFormatPlugin(), formatSelection); + } + } + return null; + } + } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java index 755ae4f75..365166b87 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -73,9 +73,7 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { @Override public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException { - List<String> files = selection.getFiles(); - assert (files.size() == 1); - String tableName = files.get(0); + String tableName = getTableName(selection); TableProperties props = getMaprFS().getTableProperties(new Path(tableName)); if (props.getAttr().getJson()) { @@ -97,4 +95,17 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { return connection; } + /** + * Allows to get a table name from FileSelection object + * + * @param selection File selection object + * @return string table name + */ + @JsonIgnore + public String getTableName(FileSelection selection) { + List<String> files = selection.getFiles(); + assert (files.size() == 1); + return files.get(0); + } + } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java index 282b84852..16b979e51 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.hbase.DrillHBaseConstants; import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.drill.exec.store.hbase.HBaseUtils; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan; @@ -113,8 +114,8 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC @Override public GroupScan clone(List<SchemaPath> columns) { BinaryTableGroupScan newScan = new BinaryTableGroupScan(this); - newScan.columns = columns; - newScan.verifyColumns(); + newScan.columns = columns == null ? ALL_COLUMNS : columns; + HBaseUtils.verifyColumns(columns, hTableDesc); return newScan; } @@ -145,21 +146,9 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC } catch (Exception e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } - verifyColumns(); + HBaseUtils.verifyColumns(columns, hTableDesc); } - private void verifyColumns() { - /* - if (columns != null) { - for (SchemaPath column : columns) { - if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { - DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", - column.getRootSegment().getPath(), hTableDesc.getNameAsString()); - } - } - } - */ - } protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) { HBaseScanSpec spec = hbaseScanSpec; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBBinaryTable.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBBinaryTable.java new file mode 100644 index 000000000..23aa43b29 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBBinaryTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db.binary; + + +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.hbase.AbstractHBaseDrillTable; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; + +public class MapRDBBinaryTable extends AbstractHBaseDrillTable { + + public MapRDBBinaryTable(String storageEngineName, FileSystemPlugin storagePlugin, MapRDBFormatPlugin formatPlugin, + FormatSelection selection) { + super(storageEngineName, storagePlugin, selection); + setTableDesc(formatPlugin.getConnection(), formatPlugin.getTableName(selection.getSelection())); + } + +} diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java new file mode 100644 index 000000000..32f4d4317 --- /dev/null +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hbase; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.store.StoragePlugin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; + +import static org.apache.drill.exec.store.hbase.DrillHBaseConstants.ROW_KEY; + +public abstract class AbstractHBaseDrillTable extends DrillTable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHBaseDrillTable.class); + + protected HTableDescriptor tableDesc; + + public AbstractHBaseDrillTable(String storageEngineName, StoragePlugin plugin, Object selection) { + super(storageEngineName, plugin, selection); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + ArrayList<RelDataType> typeList = new ArrayList<>(); + ArrayList<String> fieldNameList = new ArrayList<>(); + + fieldNameList.add(ROW_KEY); + typeList.add(typeFactory.createSqlType(SqlTypeName.ANY)); + + Set<byte[]> families = tableDesc.getFamiliesKeys(); + for (byte[] family : families) { + fieldNameList.add(Bytes.toString(family)); + typeList.add(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.ANY))); + } + return typeFactory.createStructType(typeList, fieldNameList); + } + + /** + * Allows to set HTableDescriptor + * + * @param connection with a server + * @param tableName the name of table + */ + protected void setTableDesc(Connection connection, String tableName) { + try { + tableDesc = connection.getAdmin().getTableDescriptor(TableName.valueOf(tableName)); + } catch (IOException e) { + throw UserException.dataReadError() + .message("Failure while loading table %s in database %s.", tableName, getStorageEngineName()) + .addContext("Message: ", e.getMessage()) + .build(logger); + } + } + +} diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java index b916ae744..16de5eb99 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,51 +17,13 @@ */ package org.apache.drill.exec.store.hbase; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Set; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.planner.logical.DrillTable; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.util.Bytes; -public class DrillHBaseTable extends DrillTable implements DrillHBaseConstants { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHBaseTable.class); - - private HTableDescriptor tableDesc; +public class DrillHBaseTable extends AbstractHBaseDrillTable { public DrillHBaseTable(String storageEngineName, HBaseStoragePlugin plugin, HBaseScanSpec scanSpec) { super(storageEngineName, plugin, scanSpec); - try(Admin admin = plugin.getConnection().getAdmin()) { - tableDesc = admin.getTableDescriptor(TableName.valueOf(scanSpec.getTableName())); - } catch (IOException e) { - throw UserException.dataReadError() - .message("Failure while loading table %s in database %s.", scanSpec.getTableName(), storageEngineName) - .addContext("Message: ", e.getMessage()) - .build(logger); - } - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - ArrayList<RelDataType> typeList = new ArrayList<>(); - ArrayList<String> fieldNameList = new ArrayList<>(); - - fieldNameList.add(ROW_KEY); - typeList.add(typeFactory.createSqlType(SqlTypeName.ANY)); - - Set<byte[]> families = tableDesc.getFamiliesKeys(); - for (byte[] family : families) { - fieldNameList.add(Bytes.toString(family)); - typeList.add(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.ANY))); - } - return typeFactory.createStructType(typeList, fieldNameList); + setTableDesc(plugin.getConnection(), scanSpec.getTableName()); } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 2b8cf18ea..e143e2379 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -176,19 +176,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } catch (IOException e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } - verifyColumns(); - } - - private void verifyColumns() { - if (Utilities.isStarQuery(columns)) { - return; - } - for (SchemaPath column : columns) { - if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { - DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", - column.getRootSegment().getPath(), hTableDesc.getNameAsString()); - } - } + HBaseUtils.verifyColumns(columns, hTableDesc); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java index 0d804cde7..fc8d09d6f 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,10 @@ import java.nio.charset.CharacterCodingException; import java.util.List; import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.util.Utilities; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.ParseFilter; @@ -139,4 +142,26 @@ public class HBaseUtils { return Bytes.compareTo(left, right) < 0 ? left : right; } + + /** + * Verify the presence of a column family in the schema path of the hbase table or whether the schema path is + * the row key column. + * + * @param columns List of the selected columns + * @param hTableDesc HTableDescriptor of HBase/MapR-DB_binary table (consists the details about that table) + * @throws DrillRuntimeException if column family does not exist, or is not row_key column. + */ + public static void verifyColumns(List<SchemaPath> columns, HTableDescriptor hTableDesc) { + if (Utilities.isStarQuery(columns)) { + return; + } + for (SchemaPath column : columns) { + if (!(column.equals(DrillHBaseConstants.ROW_KEY_PATH) || + hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { + DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", + column.getRootSegment().getPath(), hTableDesc.getNameAsString()); + } + } + } + } |