diff options
Diffstat (limited to 'contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java')
-rw-r--r-- | contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java | 57 |
1 files changed, 40 insertions, 17 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java index a7322454b..3484ab32a 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.sql.DrillSqlOperator; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan; +import org.apache.drill.exec.store.hive.HiveMetadataProvider; import org.apache.drill.exec.store.hive.HiveReadEntry; import org.apache.drill.exec.store.hive.HiveScan; import org.apache.drill.exec.store.hive.HiveTableWithColumnCache; @@ -84,12 +85,12 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim * {@link MapredParquetInputFormat} * 4) No error occurred while checking for the above conditions. An error is logged as warning. * - * @param call + * @param call rule call * @return True if the rule can be applied. False otherwise */ @Override public boolean matches(RelOptRuleCall call) { - final DrillScanRel scanRel = (DrillScanRel) call.rel(0); + final DrillScanRel scanRel = call.rel(0); if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) { return false; @@ -99,6 +100,10 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final HiveConf hiveConf = hiveScan.getHiveConf(); final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable(); + if (containsUnsupportedDataTypes(hiveTable)) { + return false; + } + final Class<? extends InputFormat<?,?>> tableInputFormat = getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(), hiveConf); @@ -139,9 +144,9 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim /** * Get the input format from given {@link StorageDescriptor} - * @param properties - * @param hiveReadEntry - * @param sd + * @param properties table properties + * @param hiveReadEntry hive read entry + * @param sd storage descriptor * @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning. */ private Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final Properties properties, @@ -166,25 +171,41 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim @Override public void onMatch(RelOptRuleCall call) { try { - final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0); + final DrillScanRel hiveScanRel = call.rel(0); final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); final String partitionColumnLabel = settings.getFsPartitionColumnLabel(); final Table hiveTable = hiveScan.getHiveReadEntry().getTable(); - checkForUnsupportedDataTypes(hiveTable); + final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry(); - final Map<String, String> partitionColMapping = - getPartitionColMapping(hiveTable, partitionColumnLabel); + final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf()); + final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry); + + if (logicalInputSplits.isEmpty()) { + // table is empty, use original scan + return; + } - final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel); + final Map<String, String> partitionColMapping = getPartitionColMapping(hiveTable, partitionColumnLabel); + final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits); if (hiveScanRel.getRowType().getFieldCount() == 0) { call.transformTo(nativeScanRel); } else { final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel); call.transformTo(projectRel); } + + + /* + Drill native scan should take precedence over Hive since it's more efficient and faster. + Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows + and we calculate them approximately). On the contrary, Drill calculates number of rows exactly + and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive. + To ensure Drill native scan we'll be chosen, reduce Hive scan importance to 0. + */ + call.getPlanner().setImportance(hiveScanRel, 0.0); } catch (final Exception e) { logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", e); } @@ -208,7 +229,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim * Helper method which creates a DrillScalRel with native HiveScan. */ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping, - final DrillScanRel hiveScanRel) throws Exception{ + final DrillScanRel hiveScanRel, + final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits) throws Exception { final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory(); final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR); @@ -245,10 +267,9 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final HiveDrillNativeParquetScan nativeHiveScan = new HiveDrillNativeParquetScan( hiveScan.getUserName(), - hiveScan.getHiveReadEntry(), - hiveScan.getStoragePlugin(), nativeScanCols, - null); + hiveScan.getStoragePlugin(), + logicalInputSplits); return new DrillScanRel( hiveScanRel.getCluster(), @@ -321,15 +342,17 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim return rb.makeCast(outputType, inputRef); } - private void checkForUnsupportedDataTypes(final Table hiveTable) { - for(FieldSchema hiveField : hiveTable.getSd().getCols()) { + private boolean containsUnsupportedDataTypes(final Table hiveTable) { + for (FieldSchema hiveField : hiveTable.getSd().getCols()) { final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory(); if (category == Category.MAP || category == Category.STRUCT || category == Category.UNION || category == Category.LIST) { - HiveUtilities.throwUnsupportedHiveDataTypeError(category.toString()); + logger.debug("Hive table contains unsupported data type: {}", category); + return true; } } + return false; } } |