aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java')
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java57
1 files changed, 40 insertions, 17 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index a7322454b..3484ab32a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.sql.DrillSqlOperator;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
@@ -84,12 +85,12 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
* {@link MapredParquetInputFormat}
* 4) No error occurred while checking for the above conditions. An error is logged as warning.
*
- * @param call
+ * @param call rule call
* @return True if the rule can be applied. False otherwise
*/
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scanRel = (DrillScanRel) call.rel(0);
+ final DrillScanRel scanRel = call.rel(0);
if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
return false;
@@ -99,6 +100,10 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final HiveConf hiveConf = hiveScan.getHiveConf();
final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable();
+ if (containsUnsupportedDataTypes(hiveTable)) {
+ return false;
+ }
+
final Class<? extends InputFormat<?,?>> tableInputFormat =
getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(),
hiveConf);
@@ -139,9 +144,9 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
/**
* Get the input format from given {@link StorageDescriptor}
- * @param properties
- * @param hiveReadEntry
- * @param sd
+ * @param properties table properties
+ * @param hiveReadEntry hive read entry
+ * @param sd storage descriptor
* @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
*/
private Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final Properties properties,
@@ -166,25 +171,41 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
@Override
public void onMatch(RelOptRuleCall call) {
try {
- final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+ final DrillScanRel hiveScanRel = call.rel(0);
final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
- checkForUnsupportedDataTypes(hiveTable);
+ final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
- final Map<String, String> partitionColMapping =
- getPartitionColMapping(hiveTable, partitionColumnLabel);
+ final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
+ final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);
+
+ if (logicalInputSplits.isEmpty()) {
+ // table is empty, use original scan
+ return;
+ }
- final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel);
+ final Map<String, String> partitionColMapping = getPartitionColMapping(hiveTable, partitionColumnLabel);
+ final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits);
if (hiveScanRel.getRowType().getFieldCount() == 0) {
call.transformTo(nativeScanRel);
} else {
final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);
call.transformTo(projectRel);
}
+
+
+ /*
+ Drill native scan should take precedence over Hive since it's more efficient and faster.
+ Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows
+ and we calculate them approximately). On the contrary, Drill calculates number of rows exactly
+ and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive.
+ To ensure Drill native scan we'll be chosen, reduce Hive scan importance to 0.
+ */
+ call.getPlanner().setImportance(hiveScanRel, 0.0);
} catch (final Exception e) {
logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", e);
}
@@ -208,7 +229,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
* Helper method which creates a DrillScalRel with native HiveScan.
*/
private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping,
- final DrillScanRel hiveScanRel) throws Exception{
+ final DrillScanRel hiveScanRel,
+ final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits) throws Exception {
final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
@@ -245,10 +267,9 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final HiveDrillNativeParquetScan nativeHiveScan =
new HiveDrillNativeParquetScan(
hiveScan.getUserName(),
- hiveScan.getHiveReadEntry(),
- hiveScan.getStoragePlugin(),
nativeScanCols,
- null);
+ hiveScan.getStoragePlugin(),
+ logicalInputSplits);
return new DrillScanRel(
hiveScanRel.getCluster(),
@@ -321,15 +342,17 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
return rb.makeCast(outputType, inputRef);
}
- private void checkForUnsupportedDataTypes(final Table hiveTable) {
- for(FieldSchema hiveField : hiveTable.getSd().getCols()) {
+ private boolean containsUnsupportedDataTypes(final Table hiveTable) {
+ for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
if (category == Category.MAP ||
category == Category.STRUCT ||
category == Category.UNION ||
category == Category.LIST) {
- HiveUtilities.throwUnsupportedHiveDataTypeError(category.toString());
+ logger.debug("Hive table contains unsupported data type: {}", category);
+ return true;
}
}
+ return false;
}
}