aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-hive
diff options
context:
space:
mode:
authorArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-03-20 18:29:45 +0000
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-04-27 11:41:22 +0300
commitc6549e58859397c88cb1de61b4f6eee52a07ed0c (patch)
tree60a4a3d48d095f5afe7d9e07a86a8114be4e85b2 /contrib/storage-hive
parent84cd83495adf7b0a80932535809c58a1cd3324e9 (diff)
DRILL-6331: Revisit Hive Drill native parquet implementation to be exposed to Drill optimizations (filter / limit push down, count to direct scan)
1. Factored out common logic for Drill parquet reader and Hive Drill native parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, AbstractParquetScanBatchCreator. 2. Rules that worked previously only with ParquetGroupScan, now can be applied for any class that extends AbstractParquetGroupScan: DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule. 3. Hive populated partition values based on information returned from Hive metastore. Drill populates partition values based on path difference between selection root and actual file path. Before ColumnExplorer populated partition values based on Drill approach. Since now ColumnExplorer populates values for parquet files from Hive tables, `populateImplicitColumns` method logic was changed to populated partition columns only based on given partition values. 4. Refactored ParquetPartitionDescriptor to be responsible for populating partition values rather than storing this logic in parquet group scan class. 5. Metadata class was moved to separate metadata package (org.apache.drill.exec.store.parquet.metadata). Factored out several inner classed to improve code readability. 6. Collected all Drill native parquet reader unit tests into one class TestHiveDrillNativeParquetReader, also added new tests to cover new functionality. 7. Reduced excessive logging when parquet files metadata is read closes #1214
Diffstat (limited to 'contrib/storage-hive')
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java57
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java131
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java204
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java79
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java55
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java208
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java5
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java95
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java248
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java19
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java13
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java2
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java215
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java6
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java2
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java82
-rwxr-xr-xcontrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquetbin0 -> 220 bytes
-rwxr-xr-xcontrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquetbin0 -> 220 bytes
18 files changed, 847 insertions, 574 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index a7322454b..3484ab32a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.sql.DrillSqlOperator;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
@@ -84,12 +85,12 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
* {@link MapredParquetInputFormat}
* 4) No error occurred while checking for the above conditions. An error is logged as warning.
*
- * @param call
+ * @param call rule call
* @return True if the rule can be applied. False otherwise
*/
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scanRel = (DrillScanRel) call.rel(0);
+ final DrillScanRel scanRel = call.rel(0);
if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
return false;
@@ -99,6 +100,10 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final HiveConf hiveConf = hiveScan.getHiveConf();
final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable();
+ if (containsUnsupportedDataTypes(hiveTable)) {
+ return false;
+ }
+
final Class<? extends InputFormat<?,?>> tableInputFormat =
getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(),
hiveConf);
@@ -139,9 +144,9 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
/**
* Get the input format from given {@link StorageDescriptor}
- * @param properties
- * @param hiveReadEntry
- * @param sd
+ * @param properties table properties
+ * @param hiveReadEntry hive read entry
+ * @param sd storage descriptor
* @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
*/
private Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final Properties properties,
@@ -166,25 +171,41 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
@Override
public void onMatch(RelOptRuleCall call) {
try {
- final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+ final DrillScanRel hiveScanRel = call.rel(0);
final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
- checkForUnsupportedDataTypes(hiveTable);
+ final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
- final Map<String, String> partitionColMapping =
- getPartitionColMapping(hiveTable, partitionColumnLabel);
+ final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
+ final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);
+
+ if (logicalInputSplits.isEmpty()) {
+ // table is empty, use original scan
+ return;
+ }
- final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel);
+ final Map<String, String> partitionColMapping = getPartitionColMapping(hiveTable, partitionColumnLabel);
+ final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits);
if (hiveScanRel.getRowType().getFieldCount() == 0) {
call.transformTo(nativeScanRel);
} else {
final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);
call.transformTo(projectRel);
}
+
+
+ /*
+ Drill native scan should take precedence over Hive since it's more efficient and faster.
+ Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows
+ and we calculate them approximately). On the contrary, Drill calculates number of rows exactly
+ and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive.
+ To ensure Drill native scan we'll be chosen, reduce Hive scan importance to 0.
+ */
+ call.getPlanner().setImportance(hiveScanRel, 0.0);
} catch (final Exception e) {
logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", e);
}
@@ -208,7 +229,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
* Helper method which creates a DrillScalRel with native HiveScan.
*/
private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping,
- final DrillScanRel hiveScanRel) throws Exception{
+ final DrillScanRel hiveScanRel,
+ final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits) throws Exception {
final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
@@ -245,10 +267,9 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final HiveDrillNativeParquetScan nativeHiveScan =
new HiveDrillNativeParquetScan(
hiveScan.getUserName(),
- hiveScan.getHiveReadEntry(),
- hiveScan.getStoragePlugin(),
nativeScanCols,
- null);
+ hiveScan.getStoragePlugin(),
+ logicalInputSplits);
return new DrillScanRel(
hiveScanRel.getCluster(),
@@ -321,15 +342,17 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
return rb.makeCast(outputType, inputRef);
}
- private void checkForUnsupportedDataTypes(final Table hiveTable) {
- for(FieldSchema hiveField : hiveTable.getSd().getCols()) {
+ private boolean containsUnsupportedDataTypes(final Table hiveTable) {
+ for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
if (category == Category.MAP ||
category == Category.STRUCT ||
category == Category.UNION ||
category == Category.LIST) {
- HiveUtilities.throwUnsupportedHiveDataTypeError(category.toString());
+ logger.debug("Hive table contains unsupported data type: {}", category);
+ return true;
}
}
+ return false;
}
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
new file mode 100644
index 000000000..e22701511
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.List;
+
+@JsonTypeName("hive-drill-native-parquet-row-group-scan")
+public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupScan {
+
+ private final HiveStoragePlugin hiveStoragePlugin;
+ private final HiveStoragePluginConfig hiveStoragePluginConfig;
+ private final HivePartitionHolder hivePartitionHolder;
+
+ @JsonCreator
+ public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("userName") String userName,
+ @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
+ @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+ @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
+ this(userName,
+ (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
+ rowGroupReadEntries,
+ columns,
+ hivePartitionHolder,
+ filter);
+ }
+
+ public HiveDrillNativeParquetRowGroupScan(String userName,
+ HiveStoragePlugin hiveStoragePlugin,
+ List<RowGroupReadEntry> rowGroupReadEntries,
+ List<SchemaPath> columns,
+ HivePartitionHolder hivePartitionHolder,
+ LogicalExpression filter) {
+ super(userName, rowGroupReadEntries, columns, filter);
+ this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
+ this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
+ this.hivePartitionHolder = hivePartitionHolder;
+ }
+
+ @JsonProperty
+ public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+ return hiveStoragePluginConfig;
+ }
+
+ @JsonProperty
+ public HivePartitionHolder getHivePartitionHolder() {
+ return hivePartitionHolder;
+ }
+
+ @JsonIgnore
+ public HiveStoragePlugin getHiveStoragePlugin() {
+ return hiveStoragePlugin;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+ }
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.HIVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN_VALUE;
+ }
+
+ @Override
+ public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+ }
+
+ @Override
+ public boolean areCorruptDatesAutoCorrected() {
+ return true;
+ }
+
+ @Override
+ public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
+ Path path = new Path(rowGroupReadEntry.getPath()).getParent();
+ return new ProjectionPusher().pushProjectionsAndFilters(
+ new JobConf(hiveStoragePlugin.getHiveConf()),
+ path.getParent());
+ }
+
+ @Override
+ public boolean supportsFileImplicitColumns() {
+ return false;
+ }
+
+ @Override
+ public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
+ return hivePartitionHolder.get(rowGroupReadEntry.getPath());
+ }
+}
+
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 202bd435e..03a80d338 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,94 +21,204 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.RowGroupInfo;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
-/**
- * Extension of {@link HiveScan} which support reading Hive tables using Drill's native parquet reader.
- */
@JsonTypeName("hive-drill-native-parquet-scan")
-public class HiveDrillNativeParquetScan extends HiveScan {
+public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
+
+ private final HiveStoragePlugin hiveStoragePlugin;
+ private HivePartitionHolder hivePartitionHolder;
@JsonCreator
- public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
- @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
+ public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry,
+ @JsonProperty("userName") String userName,
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
- super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, pluginRegistry);
+ @JsonProperty("entries") List<ReadEntryWithPath> entries,
+ @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+ @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
+ super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
+ this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
+ this.hivePartitionHolder = hivePartitionHolder;
+
+ init();
}
- public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin hiveStoragePlugin,
- List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
- super(userName, hiveReadEntry, hiveStoragePlugin, columns, metadataProvider);
+ public HiveDrillNativeParquetScan(String userName,
+ List<SchemaPath> columns,
+ HiveStoragePlugin hiveStoragePlugin,
+ List<LogicalInputSplit> logicalInputSplits) throws IOException {
+ this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE);
}
- public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
- super(hiveScan);
+ public HiveDrillNativeParquetScan(String userName,
+ List<SchemaPath> columns,
+ HiveStoragePlugin hiveStoragePlugin,
+ List<LogicalInputSplit> logicalInputSplits,
+ LogicalExpression filter) throws IOException {
+ super(userName, columns, new ArrayList<>(), filter);
+
+ this.hiveStoragePlugin = hiveStoragePlugin;
+ this.hivePartitionHolder = new HivePartitionHolder();
+
+ for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
+ Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator();
+ // logical input split contains list of splits by files
+ // we need to read path of only one to get file path
+ assert iterator.hasNext();
+ InputSplit split = iterator.next();
+ assert split instanceof FileSplit;
+ FileSplit fileSplit = (FileSplit) split;
+ Path finalPath = fileSplit.getPath();
+ String pathString = Path.getPathWithoutSchemeAndAuthority(finalPath).toString();
+ entries.add(new ReadEntryWithPath(pathString));
+
+ // store partition values per path
+ Partition partition = logicalInputSplit.getPartition();
+ if (partition != null) {
+ hivePartitionHolder.add(pathString, partition.getValues());
+ }
+ }
+
+ init();
}
- @Override
- public ScanStats getScanStats() {
- final ScanStats nativeHiveScanStats = super.getScanStats();
+ private HiveDrillNativeParquetScan(HiveDrillNativeParquetScan that) {
+ super(that);
+ this.hiveStoragePlugin = that.hiveStoragePlugin;
+ this.hivePartitionHolder = that.hivePartitionHolder;
+ }
- // As Drill's native parquet record reader is faster and memory efficient. Divide the CPU cost
- // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes.
- return new ScanStats(
- nativeHiveScanStats.getGroupScanProperty(),
- nativeHiveScanStats.getRecordCount(),
- nativeHiveScanStats.getCpuCost()/getSerDeOverheadFactor(),
- nativeHiveScanStats.getDiskCost());
+ @JsonProperty
+ public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+ return hiveStoragePlugin.getConfig();
}
- @Override
- public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
- try {
- return new HiveDrillNativeParquetSubScan((HiveSubScan)super.getSpecificScan(minorFragmentId));
- } catch (IOException | ReflectiveOperationException e) {
- throw new ExecutionSetupException(e);
- }
+ @JsonProperty
+ public HivePartitionHolder getHivePartitionHolder() {
+ return hivePartitionHolder;
}
@Override
- public boolean isNativeReader() {
- return true;
+ public SubScan getSpecificScan(int minorFragmentId) {
+ List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
+ HivePartitionHolder subPartitionHolder = new HivePartitionHolder();
+ for (RowGroupReadEntry readEntry : readEntries) {
+ List<String> values = hivePartitionHolder.get(readEntry.getPath());
+ subPartitionHolder.add(readEntry.getPath(), values);
+ }
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
return new HiveDrillNativeParquetScan(this);
}
@Override
- public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
- return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, getStoragePlugin(), getColumns(), getMetadataProvider());
+ public HiveDrillNativeParquetScan clone(FileSelection selection) throws IOException {
+ HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this);
+ newScan.modifyFileSelection(selection);
+ newScan.init();
+ return newScan;
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
- final HiveDrillNativeParquetScan scan = new HiveDrillNativeParquetScan(this);
- scan.columns = columns;
- return scan;
+ HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this);
+ newScan.columns = columns;
+ return newScan;
}
@Override
public String toString() {
- final List<HivePartitionWrapper> partitions = getHiveReadEntry().getHivePartitionWrappers();
- int numPartitions = partitions == null ? 0 : partitions.size();
- return "HiveDrillNativeParquetScan [table=" + getHiveReadEntry().getHiveTableWrapper()
- + ", columns=" + getColumns()
- + ", numPartitions=" + numPartitions
- + ", partitions= " + partitions
- + ", inputDirectories=" + getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]";
+ StringBuilder builder = new StringBuilder();
+ builder.append("HiveDrillNativeParquetScan [");
+ builder.append("entries=").append(entries);
+ builder.append(", numFiles=").append(getEntries().size());
+ builder.append(", numRowGroups=").append(rowGroupInfos.size());
+
+ String filterString = getFilterString();
+ if (!filterString.isEmpty()) {
+ builder.append(", filter=").append(filterString);
+ }
+
+ builder.append(", columns=").append(columns);
+ builder.append("]");
+
+ return builder.toString();
}
+
+ @Override
+ protected void initInternal() throws IOException {
+ ParquetFormatConfig formatConfig = new ParquetFormatConfig();
+ Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
+ for (ReadEntryWithPath entry : entries) {
+ Path path = new Path(entry.getPath());
+ Configuration conf = new ProjectionPusher().pushProjectionsAndFilters(
+ new JobConf(hiveStoragePlugin.getHiveConf()),
+ path.getParent());
+ FileSystem fs = path.getFileSystem(conf);
+ fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs);
+ }
+ parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, formatConfig);
+ }
+
+ @Override
+ protected Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits() {
+ return hiveStoragePlugin.getContext().getBits();
+ }
+
+ @Override
+ protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException {
+ FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false);
+ return clone(newSelection);
+ }
+
+ @Override
+ protected boolean supportsFileImplicitColumns() {
+ return false;
+ }
+
+ @Override
+ protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) {
+ return hivePartitionHolder.get(rowGroupInfo.getPath());
+ }
+
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
new file mode 100644
index 000000000..669369b3e
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanBatchCreator implements BatchCreator<HiveDrillNativeParquetRowGroupScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, HiveDrillNativeParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ OperatorContext oContext = context.newOperatorContext(rowGroupScan);
+ return getBatch(context, rowGroupScan, oContext);
+ }
+
+ @Override
+ protected AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager) {
+ return new HiveDrillNativeParquetDrillFileSystemManager(operatorContext);
+ }
+
+ /**
+ * Caches file system per path and returns file system from cache if it exists there.
+ * Creates only non-tracking file systems.
+ */
+ private class HiveDrillNativeParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
+
+ private final Map<String, DrillFileSystem> fileSystems;
+
+ HiveDrillNativeParquetDrillFileSystemManager(OperatorContext operatorContext) {
+ super(operatorContext);
+ this.fileSystems = new HashMap<>();
+ }
+
+ @Override
+ protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException {
+ DrillFileSystem fs = fileSystems.get(path);
+ if (fs == null) {
+ try {
+ fs = operatorContext.newNonTrackingFileSystem(config);
+ } catch (IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create non-tracking DrillFileSystem: %s", e.getMessage()), e);
+ }
+ fileSystems.put(path, fs);
+ }
+ return fs;
+ }
+ }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
deleted file mode 100644
index 2129ed454..000000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Extension of {@link HiveSubScan} which support reading Hive tables using Drill's native parquet reader.
- */
-@JsonTypeName("hive-drill-native-parquet-sub-scan")
-public class HiveDrillNativeParquetSubScan extends HiveSubScan {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDrillNativeParquetSubScan.class);
-
- @JsonCreator
- public HiveDrillNativeParquetSubScan(@JacksonInject StoragePluginRegistry registry,
- @JsonProperty("userName") String userName,
- @JsonProperty("splits") List<List<String>> splits,
- @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
- @JsonProperty("splitClasses") List<String> splitClasses,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig)
- throws IOException, ExecutionSetupException, ReflectiveOperationException {
- super(registry, userName, splits, hiveReadEntry, splitClasses, columns, hiveStoragePluginConfig);
- }
-
- public HiveDrillNativeParquetSubScan(final HiveSubScan subScan)
- throws IOException, ExecutionSetupException, ReflectiveOperationException {
- super(subScan.getUserName(), subScan.getSplits(), subScan.getHiveReadEntry(), subScan.getSplitClasses(),
- subScan.getColumns(), subScan.getStoragePlugin());
- }
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
deleted file mode 100644
index 43318d17c..000000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Functions;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hive.readers.HiveDefaultReader;
-import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.util.Utilities;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-@SuppressWarnings("unused")
-public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDrillNativeScanBatchCreator.class);
-
- @Override
- public ScanBatch getBatch(ExecutorFragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
- throws ExecutionSetupException {
- final HiveTableWithColumnCache table = config.getTable();
- final List<List<InputSplit>> splits = config.getInputSplits();
- final List<HivePartition> partitions = config.getPartitions();
- final List<SchemaPath> columns = config.getColumns();
- final String partitionDesignator = context.getOptions()
- .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
- List<Map<String, String>> implicitColumns = Lists.newLinkedList();
- boolean selectAllQuery = Utilities.isStarQuery(columns);
-
- final boolean hasPartitions = (partitions != null && partitions.size() > 0);
-
- final List<String[]> partitionColumns = Lists.newArrayList();
- final List<Integer> selectedPartitionColumns = Lists.newArrayList();
- List<SchemaPath> tableColumns = columns;
- if (!selectAllQuery) {
- // Separate out the partition and non-partition columns. Non-partition columns are passed directly to the
- // ParquetRecordReader. Partition columns are passed to ScanBatch.
- tableColumns = Lists.newArrayList();
- Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
- for (SchemaPath column : columns) {
- Matcher m = pattern.matcher(column.getRootSegmentPath());
- if (m.matches()) {
- selectedPartitionColumns.add(
- Integer.parseInt(column.getRootSegmentPath().substring(partitionDesignator.length())));
- } else {
- tableColumns.add(column);
- }
- }
- }
-
- final OperatorContext oContext = context.newOperatorContext(config);
-
- int currentPartitionIndex = 0;
- final List<RecordReader> readers = new LinkedList<>();
-
- final HiveConf conf = config.getHiveConf();
-
- // TODO: In future we can get this cache from Metadata cached on filesystem.
- final Map<String, ParquetMetadata> footerCache = Maps.newHashMap();
-
- Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
- try {
- for (List<InputSplit> splitGroups : splits) {
- for (InputSplit split : splitGroups) {
- final FileSplit fileSplit = (FileSplit) split;
- final Path finalPath = fileSplit.getPath();
- final JobConf cloneJob =
- new ProjectionPusher().pushProjectionsAndFilters(new JobConf(conf), finalPath.getParent());
- final FileSystem fs = finalPath.getFileSystem(cloneJob);
-
- ParquetMetadata parquetMetadata = footerCache.get(finalPath.toString());
- if (parquetMetadata == null) {
- parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
- footerCache.put(finalPath.toString(), parquetMetadata);
- }
- final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
-
- for (int rowGroupNum : rowGroupNums) {
- //DRILL-5009 : Skip the row group if the row count is zero
- if (parquetMetadata.getBlocks().get(rowGroupNum).getRowCount() == 0) {
- continue;
- }
- // Drill has only ever written a single row group per file, only detect corruption
- // in the first row group
- ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
- ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
- if (logger.isDebugEnabled()) {
- logger.debug(containsCorruptDates.toString());
- }
- readers.add(new ParquetRecordReader(
- context,
- Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
- rowGroupNum, fs,
- CodecFactory.createDirectCodecFactory(fs.getConf(),
- new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
- parquetMetadata,
- tableColumns,
- containsCorruptDates)
- );
- Map<String, String> implicitValues = Maps.newLinkedHashMap();
-
- if (hasPartitions) {
- List<String> values = partitions.get(currentPartitionIndex).getValues();
- for (int i = 0; i < values.size(); i++) {
- if (selectAllQuery || selectedPartitionColumns.contains(i)) {
- implicitValues.put(partitionDesignator + i, values.get(i));
- }
- }
- }
- implicitColumns.add(implicitValues);
- if (implicitValues.size() > mapWithMaxColumns.size()) {
- mapWithMaxColumns = implicitValues;
- }
- }
- currentPartitionIndex++;
- }
- }
- } catch (final IOException|RuntimeException e) {
- AutoCloseables.close(e, readers);
- throw new ExecutionSetupException("Failed to create RecordReaders. " + e.getMessage(), e);
- }
-
- // all readers should have the same number of implicit columns, add missing ones with value null
- mapWithMaxColumns = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
- for (Map<String, String> map : implicitColumns) {
- map.putAll(Maps.difference(map, mapWithMaxColumns).entriesOnlyOnRight());
- }
-
- // If there are no readers created (which is possible when the table is empty or no row groups are matched),
- // create an empty RecordReader to output the schema
- if (readers.size() == 0) {
- readers.add(new HiveDefaultReader(table, null, null, tableColumns, context, conf,
- ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
- }
-
- return new ScanBatch(context, oContext, readers, implicitColumns);
- }
-
- /**
- * Get the list of row group numbers for given file input split. Logic used here is same as how Hive's parquet input
- * format finds the row group numbers for input split.
- */
- private List<Integer> getRowGroupNumbersFromFileSplit(final FileSplit split,
- final ParquetMetadata footer) throws IOException {
- final List<BlockMetaData> blocks = footer.getBlocks();
-
- final long splitStart = split.getStart();
- final long splitLength = split.getLength();
-
- final List<Integer> rowGroupNums = Lists.newArrayList();
-
- int i = 0;
- for (final BlockMetaData block : blocks) {
- final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
- if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
- rowGroupNums.add(i);
- }
- i++;
- }
-
- return rowGroupNums;
- }
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index b11ef3b7a..c8775643a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -243,7 +243,10 @@ public class HiveMetadataProvider {
data += split.getLength();
}
- return new HiveStats(data/RECORD_SIZE, data);
+ long numRows = data / RECORD_SIZE;
+ // if the result of division is zero and data size > 0, estimate to one row
+ numRows = numRows == 0 && data > 0 ? 1 : numRows;
+ return new HiveStats(numRows, data);
}
/**
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
new file mode 100644
index 000000000..803144e0b
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class that stores partition values per key.
+ * Key to index mapper contains key and index corresponding to partition values position in partition values list.
+ * Since several keys may have that same partition values, such structure is optimized to save memory usage.
+ * Partition values are stored in list of consecutive values.
+ */
+public class HivePartitionHolder {
+
+ private final Map<String, Integer> keyToIndexMapper;
+ private final List<List<String>> partitionValues;
+
+ @JsonCreator
+ public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map<String, Integer> keyToIndexMapper,
+ @JsonProperty("partitionValues") List<List<String>> partitionValues) {
+ this.keyToIndexMapper = keyToIndexMapper;
+ this.partitionValues = partitionValues;
+ }
+
+ public HivePartitionHolder() {
+ this.keyToIndexMapper = new HashMap<>();
+ this.partitionValues = new ArrayList<>();
+ }
+
+ @JsonProperty
+ public Map<String, Integer> getKeyToIndexMapper() {
+ return keyToIndexMapper;
+ }
+
+ @JsonProperty
+ public List<List<String>> getPartitionValues() {
+ return partitionValues;
+ }
+
+ /**
+ * Checks if partition values already exist in holder.
+ * If not, adds them to holder and adds key and index corresponding to partition values to mapper.
+ * If partition values exist, adds key and existing partition values index to mapper.
+ *
+ * @param key mapper key
+ * @param values partition values
+ */
+ public void add(String key, List<String> values) {
+ int index = partitionValues.indexOf(values);
+ if (index == -1) {
+ index = partitionValues.size();
+ partitionValues.add(values);
+
+ }
+ keyToIndexMapper.put(key, index);
+ }
+
+ /**
+ * Returns list of partition values stored in holder for the given key.
+ * If there are no corresponding partition values, return empty list.
+ *
+ * @param key mapper key
+ * @return list of partition values
+ */
+ public List<String> get(String key) {
+ Integer index = keyToIndexMapper.get(key);
+ if (index == null) {
+ return Collections.emptyList();
+ }
+ return partitionValues.get(index);
+ }
+
+}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
new file mode 100644
index 000000000..23c67b574
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveDrillNativeParquetReader extends HiveTestBase {
+
+ @BeforeClass
+ public static void init() {
+ setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, true);
+ setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
+ resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+ }
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testFilterPushDownForManagedTable() throws Exception {
+ String query = "select * from hive.kv_native where key > 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2, actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ }
+
+ @Test
+ public void testFilterPushDownForExternalTable() throws Exception {
+ String query = "select * from hive.kv_native_ext where key = 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 1, actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ }
+
+ @Test
+ public void testManagedPartitionPruning() throws Exception {
+ String query = "select * from hive.readtest_parquet where tinyint_part = 64";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2, actualRowCount);
+
+ // Hive partition pruning is applied during logical stage
+ // while convert to Drill native parquet reader during physical
+ // thus plan should not contain filter
+ testPlanMatchingPatterns(query,
+ new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{"Filter"});
+ }
+
+ @Test
+ public void testExternalPartitionPruning() throws Exception {
+ String query = "select `key` from hive.kv_native_ext where part_key = 2";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2, actualRowCount);
+
+ // Hive partition pruning is applied during logical stage
+ // while convert to Drill native parquet reader during physical
+ // thus plan should not contain filter
+ testPlanMatchingPatterns(query,
+ new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{"Filter"});
+ }
+
+ @Test
+ public void testSimpleStarSubQueryFilterPushDown() throws Exception {
+ String query = "select * from (select * from (select * from hive.kv_native)) where key > 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2, actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ }
+
+ @Test
+ public void testPartitionedExternalTable() throws Exception {
+ String query = "select * from hive.kv_native_ext";
+
+ testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", "numFiles=2"}, null);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("key", "part_key")
+ .baselineValues(1, 1)
+ .baselineValues(2, 1)
+ .baselineValues(3, 2)
+ .baselineValues(4, 2)
+ .go();
+ }
+
+ @Test
+ public void testEmptyTable() throws Exception {
+ String query = "select * from hive.empty_table";
+ // Hive reader should be chosen to output the schema
+ testPlanMatchingPatterns(query, new String[]{"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
+ }
+
+ @Test
+ public void testEmptyPartition() throws Exception {
+ String query = "select * from hive.kv_native_ext where part_key = 3";
+ // Hive reader should be chosen to output the schema
+ testPlanMatchingPatterns(query, new String[]{"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
+ }
+
+ @Test
+ public void testPhysicalPlanSubmission() throws Exception {
+ // checks only group scan
+ PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native");
+ PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native_ext");
+ }
+
+ @Test
+ public void testProjectPushDownOptimization() throws Exception {
+ String query = "select boolean_field, int_part from hive.readtest_parquet";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2, actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ // partition column is named during scan as Drill partition columns
+ // it will be renamed to actual value in subsequent project
+ new String[]{"Project\\(boolean_field=\\[\\$0\\], int_part=\\[CAST\\(\\$1\\):INTEGER\\]\\)",
+ "HiveDrillNativeParquetScan",
+ "columns=\\[`boolean_field`, `dir9`\\]"},
+ new String[]{});
+ }
+
+ @Test
+ public void testLimitPushDownOptimization() throws Exception {
+ String query = "select * from hive.kv_native limit 2";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2, actualRowCount);
+
+ testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ }
+
+ @Test
+ public void testConvertCountToDirectScanOptimization() throws Exception {
+ String query = "select count(1) as cnt from hive.kv_native";
+
+ testPlanMatchingPatterns(query, new String[]{"DynamicPojoRecordReader"}, null);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(8L)
+ .go();
+ }
+
+ @Test
+ public void testImplicitColumns() throws Exception {
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(CoreMatchers.allOf(containsString("VALIDATION ERROR"), containsString("not found in any table")));
+
+ test("select *, filename, fqn, filepath, suffix from hive.kv_native");
+ }
+
+ @Test // DRILL-3739
+ public void testReadingFromStorageHandleBasedTable() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from hive.kv_sh order by key limit 2")
+ .ordered()
+ .baselineColumns("key", "value")
+ .expectsEmptyResultSet()
+ .go();
+ }
+
+ @Test
+ public void testReadAllSupportedHiveDataTypesNativeParquet() throws Exception {
+ String query = "select * from hive.readtest_parquet";
+
+ testPlanMatchingPatterns(query, new String[] {"HiveDrillNativeParquetScan"}, null);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("binary_field", "boolean_field", "tinyint_field", "decimal0_field", "decimal9_field", "decimal18_field", "decimal28_field", "decimal38_field", "double_field", "float_field", "int_field", "bigint_field", "smallint_field", "string_field", "varchar_field", "timestamp_field", "char_field",
+ // There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
+ //"binary_part",
+ "boolean_part", "tinyint_part", "decimal0_part", "decimal9_part", "decimal18_part", "decimal28_part", "decimal38_part", "double_part", "float_part", "int_part", "bigint_part", "smallint_part", "string_part", "varchar_part", "timestamp_part", "date_part", "char_part")
+ .baselineValues("binaryfield".getBytes(), false, 34, new BigDecimal("66"), new BigDecimal("2347.92"), new BigDecimal("2758725827.99990"), new BigDecimal("29375892739852.8"), new BigDecimal("89853749534593985.783"), 8.345d, 4.67f, 123456, 234235L, 3455, "stringfield", "varcharfield", new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()), "charfield",
+ // There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
+ //"binary",
+ true, 64, new BigDecimal("37"), new BigDecimal("36.90"), new BigDecimal("3289379872.94565"), new BigDecimal("39579334534534.4"), new BigDecimal("363945093845093890.900"), 8.345d, 4.67f, 123456, 234235L, 3455, "string", "varchar", new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()), new DateTime(Date.valueOf("2013-07-05").getTime()), "char").baselineValues( // All fields are null, but partition fields have non-null values
+ null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+ // There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
+ //"binary",
+ true, 64, new BigDecimal("37"), new BigDecimal("36.90"), new BigDecimal("3289379872.94565"), new BigDecimal("39579334534534.4"), new BigDecimal("363945093845093890.900"), 8.345d, 4.67f, 123456, 234235L, 3455, "string", "varchar", new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()), new DateTime(Date.valueOf("2013-07-05").getTime()), "char").go();
+ }
+
+ @Test // DRILL-3938
+ public void testNativeReaderIsDisabledForAlteredPartitionedTable() throws Exception {
+ String query = "select key, `value`, newcol from hive.kv_parquet order by key limit 1";
+
+ // Make sure the HiveScan in plan has no native parquet reader
+ testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
+ }
+
+}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index ef5a16916..7583f42ff 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -120,25 +120,6 @@ public class TestHivePartitionPruning extends HiveTestBase {
assertFalse(plan.contains("Filter"));
}
- @Test
- public void pruneDataTypeSupportNativeReaders() throws Exception {
- try {
- test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- final String query = "EXPLAIN PLAN FOR " +
- "SELECT * FROM hive.readtest_parquet WHERE tinyint_part = 64";
-
- final String plan = getPlanInString(query, OPTIQ_FORMAT);
-
- // Check and make sure that Filter is not present in the plan
- assertFalse(plan.contains("Filter"));
-
- // Make sure the plan contains the Hive scan utilizing native parquet reader
- assertTrue(plan.contains("groupscan=[HiveDrillNativeParquetScan"));
- } finally {
- test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- }
- }
-
@Test // DRILL-3579
public void selectFromPartitionedTableWithNullPartitions() throws Exception {
final String query = "SELECT count(*) nullCount FROM hive.partition_pruning_test " +
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
index 6d7ad136b..26a16b92f 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import org.apache.drill.categories.HiveStorageTest;
import org.apache.drill.exec.hive.HiveTestBase;
@@ -109,16 +108,4 @@ public class TestHiveProjectPushDown extends HiveTestBase {
testHelper(query, 1, expectedColNames);
}
- @Test
- public void projectPushDownOnHiveParquetTable() throws Exception {
- try {
- test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- String query = "SELECT boolean_field, boolean_part, int_field, int_part FROM hive.readtest_parquet";
- String expectedColNames = "\"columns\" : [ \"`boolean_field`\", \"`dir0`\", \"`int_field`\", \"`dir9`\" ]";
-
- testHelper(query, 2, expectedColNames, "hive-drill-native-parquet-scan");
- } finally {
- test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- }
- }
}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
index e3d884b57..5758eec4a 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -31,7 +31,7 @@ public class HiveTestBase extends PlanTestBase {
@BeforeClass
public static void generateHive() throws Exception {
- hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher.getRootDir());
+ hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher);
hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage());
}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 4da22b6a3..1b7774878 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import java.math.BigDecimal;
import java.sql.Date;
@@ -42,38 +44,25 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@Category({SlowTest.class, HiveStorageTest.class})
public class TestHiveStorage extends HiveTestBase {
+
@BeforeClass
- public static void setupOptions() throws Exception {
- test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+ public static void init() {
+ setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
}
- @Test // DRILL-4083
- public void testNativeScanWhenNoColumnIsRead() throws Exception {
- try {
- test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-
- String query = "SELECT count(*) as col FROM hive.countStar_Parquet";
- testPhysicalPlan(query, "hive-drill-native-parquet-scan");
-
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("col")
- .baselineValues(200L)
- .go();
- } finally {
- test("alter session reset `%s`",
- ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
- }
+ @AfterClass
+ public static void cleanup() {
+ resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
}
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@Test
public void hiveReadWithDb() throws Exception {
test("select * from hive.kv");
@@ -206,123 +195,6 @@ public class TestHiveStorage extends HiveTestBase {
.go();
}
- /**
- * Test to ensure Drill reads the all supported types through native Parquet readers.
- * NOTE: As part of Hive 1.2 upgrade, make sure this test and {@link #readAllSupportedHiveDataTypes()} are merged
- * into one test.
- */
- @Test
- public void readAllSupportedHiveDataTypesNativeParquet() throws Exception {
- try {
- test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- final String query = "SELECT * FROM hive.readtest_parquet";
-
- // Make sure the plan has Hive scan with native parquet reader
- testPhysicalPlan(query, "hive-drill-native-parquet-scan");
-
- testBuilder().sqlQuery(query)
- .unOrdered()
- .baselineColumns(
- "binary_field",
- "boolean_field",
- "tinyint_field",
- "decimal0_field",
- "decimal9_field",
- "decimal18_field",
- "decimal28_field",
- "decimal38_field",
- "double_field",
- "float_field",
- "int_field",
- "bigint_field",
- "smallint_field",
- "string_field",
- "varchar_field",
- "timestamp_field",
- "char_field",
- // There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
- //"binary_part",
- "boolean_part",
- "tinyint_part",
- "decimal0_part",
- "decimal9_part",
- "decimal18_part",
- "decimal28_part",
- "decimal38_part",
- "double_part",
- "float_part",
- "int_part",
- "bigint_part",
- "smallint_part",
- "string_part",
- "varchar_part",
- "timestamp_part",
- "date_part",
- "char_part")
- .baselineValues(
- "binaryfield".getBytes(),
- false,
- 34,
- new BigDecimal("66"),
- new BigDecimal("2347.92"),
- new BigDecimal("2758725827.99990"),
- new BigDecimal("29375892739852.8"),
- new BigDecimal("89853749534593985.783"),
- 8.345d,
- 4.67f,
- 123456,
- 234235L,
- 3455,
- "stringfield",
- "varcharfield",
- new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
- "charfield",
- // There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
- //"binary",
- true,
- 64,
- new BigDecimal("37"),
- new BigDecimal("36.90"),
- new BigDecimal("3289379872.94565"),
- new BigDecimal("39579334534534.4"),
- new BigDecimal("363945093845093890.900"),
- 8.345d,
- 4.67f,
- 123456,
- 234235L,
- 3455,
- "string",
- "varchar",
- new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
- new DateTime(Date.valueOf("2013-07-05").getTime()),
- "char")
- .baselineValues( // All fields are null, but partition fields have non-null values
- null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
- // There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
- //"binary",
- true,
- 64,
- new BigDecimal("37"),
- new BigDecimal("36.90"),
- new BigDecimal("3289379872.94565"),
- new BigDecimal("39579334534534.4"),
- new BigDecimal("363945093845093890.900"),
- 8.345d,
- 4.67f,
- 123456,
- 234235L,
- 3455,
- "string",
- "varchar",
- new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
- new DateTime(Date.valueOf("2013-07-05").getTime()),
- "char")
- .go();
- } finally {
- test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- }
- }
-
@Test
public void orderByOnHiveTable() throws Exception {
testBuilder()
@@ -402,19 +274,7 @@ public class TestHiveStorage extends HiveTestBase {
.go();
}
- @Test // DRILL-3938
- public void nativeReaderIsDisabledForAlteredPartitionedTable() throws Exception {
- try {
- test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- final String query = "EXPLAIN PLAN FOR SELECT key, `value`, newcol FROM hive.kv_parquet ORDER BY key LIMIT 1";
- // Make sure the HiveScan in plan has no native parquet reader
- final String planStr = getPlanInString(query, JSON_FORMAT);
- assertFalse("Hive native is not expected in the plan", planStr.contains("hive-drill-native-parquet-scan"));
- } finally {
- test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- }
- }
@Test // DRILL-3739
public void readingFromStorageHandleBasedTable() throws Exception {
@@ -426,22 +286,6 @@ public class TestHiveStorage extends HiveTestBase {
.go();
}
- @Test // DRILL-3739
- public void readingFromStorageHandleBasedTable2() throws Exception {
- try {
- test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-
- testBuilder()
- .sqlQuery("SELECT * FROM hive.kv_sh ORDER BY key LIMIT 2")
- .ordered()
- .baselineColumns("key", "value")
- .expectsEmptyResultSet()
- .go();
- } finally {
- test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
- }
- }
-
@Test // DRILL-3688
public void readingFromSmallTableWithSkipHeaderAndFooter() throws Exception {
testBuilder()
@@ -480,23 +324,20 @@ public class TestHiveStorage extends HiveTestBase {
.go();
}
- @Test // DRILL-3688
- public void testIncorrectHeaderFooterProperty() throws Exception {
- Map<String, String> testData = ImmutableMap.<String, String>builder()
- .put("hive.skipper.kv_incorrect_skip_header","skip.header.line.count")
- .put("hive.skipper.kv_incorrect_skip_footer", "skip.footer.line.count")
- .build();
-
- String query = "select * from %s";
- String exceptionMessage = "Hive table property %s value 'A' is non-numeric";
-
- for (Map.Entry<String, String> entry : testData.entrySet()) {
- try {
- test(String.format(query, entry.getKey()));
- } catch (UserRemoteException e) {
- assertThat(e.getMessage(), containsString(String.format(exceptionMessage, entry.getValue())));
- }
- }
+ @Test
+ public void testIncorrectHeaderProperty() throws Exception {
+ String query = "select * from hive.skipper.kv_incorrect_skip_header";
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(containsString("Hive table property skip.header.line.count value 'A' is non-numeric"));
+ test(query);
+ }
+
+ @Test
+ public void testIncorrectFooterProperty() throws Exception {
+ String query = "select * from hive.skipper.kv_incorrect_skip_footer";
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(containsString("Hive table property skip.footer.line.count value 'A' is non-numeric"));
+ test(query);
}
@Test
@@ -571,6 +412,7 @@ public class TestHiveStorage extends HiveTestBase {
@Test
public void testPhysicalPlanSubmission() throws Exception {
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv");
+ PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.readtest");
}
private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
@@ -583,9 +425,4 @@ public class TestHiveStorage extends HiveTestBase {
assertTrue("Column should be nullable", columnMetadata.getIsNullable());
}
}
-
- @AfterClass
- public static void shutdownOptions() throws Exception {
- test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
- }
}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 7f7a4011d..80da976d4 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -46,9 +46,10 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("hive.default", "kv")
.baselineValues("hive.default", "kv_parquet")
.baselineValues("hive.default", "kv_sh")
- .baselineValues("hive.default", "countstar_parquet")
.baselineValues("hive.default", "simple_json")
.baselineValues("hive.default", "partition_with_few_schemas")
+ .baselineValues("hive.default", "kv_native")
+ .baselineValues("hive.default", "kv_native_ext")
.go();
testBuilder()
@@ -249,9 +250,10 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("DRILL", "hive.default", "partition_pruning_test", "TABLE")
.baselineValues("DRILL", "hive.default", "partition_with_few_schemas", "TABLE")
.baselineValues("DRILL", "hive.default", "kv_parquet", "TABLE")
- .baselineValues("DRILL", "hive.default", "countstar_parquet", "TABLE")
.baselineValues("DRILL", "hive.default", "kv_sh", "TABLE")
.baselineValues("DRILL", "hive.default", "simple_json", "TABLE")
+ .baselineValues("DRILL", "hive.default", "kv_native", "TABLE")
+ .baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
index 3706ff21f..64680c030 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
@@ -35,7 +35,7 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
@BeforeClass
public static void generateHive() throws Exception{
- hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher.getRootDir());
+ hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher);
hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage());
}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 78e5b393a..f20699998 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
import java.sql.Date;
import java.sql.Timestamp;
@@ -31,6 +32,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Resources;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -52,9 +54,11 @@ public class HiveTestDataGenerator {
private final String dbDir;
private final String whDir;
+ private final BaseDirTestWatcher dirTestWatcher;
private final Map<String, String> config;
- public static synchronized HiveTestDataGenerator getInstance(File baseDir) throws Exception {
+ public static synchronized HiveTestDataGenerator getInstance(BaseDirTestWatcher dirTestWatcher) throws Exception {
+ File baseDir = dirTestWatcher.getRootDir();
if (instance == null || !HiveTestDataGenerator.baseDir.equals(baseDir)) {
HiveTestDataGenerator.baseDir = baseDir;
@@ -64,19 +68,20 @@ public class HiveTestDataGenerator {
final String dbDir = dbDirFile.getAbsolutePath();
final String whDir = whDirFile.getAbsolutePath();
- instance = new HiveTestDataGenerator(dbDir, whDir);
+ instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
instance.generateTestData();
}
return instance;
}
- private HiveTestDataGenerator(final String dbDir, final String whDir) {
+ private HiveTestDataGenerator(final String dbDir, final String whDir, final BaseDirTestWatcher dirTestWatcher) {
this.dbDir = dbDir;
this.whDir = whDir;
+ this.dirTestWatcher = dirTestWatcher;
config = Maps.newHashMap();
- config.put("hive.metastore.uris", "");
+ config.put(ConfVars.METASTOREURIS.toString(), "");
config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
config.put("hive.metastore.warehouse.dir", whDir);
config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
@@ -128,7 +133,7 @@ public class HiveTestDataGenerator {
try {
Files.setPosixFilePermissions(dir.toPath(), perms);
} catch (IOException e) {
- new RuntimeException(e);
+ throw new RuntimeException(e);
}
return dir;
@@ -494,22 +499,6 @@ public class HiveTestDataGenerator {
executeQuery(hiveDriver, "INSERT INTO TABLE kv_parquet PARTITION(part1) SELECT key, value, key FROM default.kv");
executeQuery(hiveDriver, "ALTER TABLE kv_parquet ADD COLUMNS (newcol string)");
- executeQuery(hiveDriver,
- "CREATE TABLE countStar_Parquet (int_field INT) STORED AS parquet");
-
- final int numOfRows = 200;
- final StringBuffer sb = new StringBuffer();
- sb.append("VALUES ");
- for(int i = 0; i < numOfRows; ++i) {
- if(i != 0) {
- sb.append(",");
- }
- sb.append("(").append(i).append(")");
- }
-
- executeQuery(hiveDriver, "INSERT INTO TABLE countStar_Parquet \n" +
- sb.toString());
-
// Create a StorageHandler based table (DRILL-3739)
executeQuery(hiveDriver, "CREATE TABLE kv_sh(key INT, value STRING) STORED BY " +
"'org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler'");
@@ -551,9 +540,60 @@ public class HiveTestDataGenerator {
Resources.getResource("simple.json") + "' into table default.simple_json";
executeQuery(hiveDriver, loadData);
+ createTestDataForDrillNativeParquetReaderTests(hiveDriver);
+
ss.close();
}
+ private void createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) {
+ // Hive managed table that has data qualified for Drill native filter push down
+ executeQuery(hiveDriver, "create table kv_native(key int, sub_key int) stored as parquet");
+ // each insert is created in separate file
+ executeQuery(hiveDriver, "insert into table kv_native values (1, 1), (1, 2)");
+ executeQuery(hiveDriver, "insert into table kv_native values (1, 3), (1, 4)");
+ executeQuery(hiveDriver, "insert into table kv_native values (2, 5), (2, 6)");
+ executeQuery(hiveDriver, "insert into table kv_native values (null, 9), (null, 10)");
+
+ // Hive external table which has three partitions
+
+ // copy external table with data from test resources
+ dirTestWatcher.copyResourceToRoot(Paths.get("external"));
+
+ File external = new File (baseDir, "external");
+ String tableLocation = new File(external, "kv_native_ext").toURI().getPath();
+
+ executeQuery(hiveDriver, String.format("create external table kv_native_ext(key int) " +
+ "partitioned by (part_key int) " +
+ "stored as parquet location '%s'",
+ tableLocation));
+
+ /*
+ DATA:
+ key, part_key
+ 1, 1
+ 2, 1
+ 3, 2
+ 4, 2
+ */
+
+ // add partitions
+
+ // partition in the same location as table
+ String firstPartition = new File(tableLocation, "part_key=1").toURI().getPath();
+ executeQuery(hiveDriver, String.format("alter table kv_native_ext add partition (part_key = '1') " +
+ "location '%s'", firstPartition));
+
+ // partition in different location with table
+ String secondPartition = new File(external, "part_key=2").toURI().getPath();
+ executeQuery(hiveDriver, String.format("alter table kv_native_ext add partition (part_key = '2') " +
+ "location '%s'", secondPartition));
+
+ // add empty partition
+ String thirdPartition = new File(dirTestWatcher.makeSubDir(Paths.get("empty_part")), "part_key=3").toURI().getPath();
+ executeQuery(hiveDriver, String.format("alter table kv_native_ext add partition (part_key = '3') " +
+ "location '%s'", thirdPartition));
+ }
+
private File getTempFile() throws Exception {
return java.nio.file.Files.createTempFile("drill-hive-test", ".txt").toFile();
}
diff --git a/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet b/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet
new file mode 100755
index 000000000..f641402fa
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet
Binary files differ
diff --git a/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet b/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet
new file mode 100755
index 000000000..c34c26713
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet
Binary files differ