aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java')
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java204
1 files changed, 157 insertions, 47 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 202bd435e..03a80d338 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,94 +21,204 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.RowGroupInfo;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
-/**
- * Extension of {@link HiveScan} which support reading Hive tables using Drill's native parquet reader.
- */
@JsonTypeName("hive-drill-native-parquet-scan")
-public class HiveDrillNativeParquetScan extends HiveScan {
+public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
+
+ private final HiveStoragePlugin hiveStoragePlugin;
+ private HivePartitionHolder hivePartitionHolder;
@JsonCreator
- public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
- @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
+ public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry,
+ @JsonProperty("userName") String userName,
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
- super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, pluginRegistry);
+ @JsonProperty("entries") List<ReadEntryWithPath> entries,
+ @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+ @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
+ super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
+ this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
+ this.hivePartitionHolder = hivePartitionHolder;
+
+ init();
}
- public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin hiveStoragePlugin,
- List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
- super(userName, hiveReadEntry, hiveStoragePlugin, columns, metadataProvider);
+ public HiveDrillNativeParquetScan(String userName,
+ List<SchemaPath> columns,
+ HiveStoragePlugin hiveStoragePlugin,
+ List<LogicalInputSplit> logicalInputSplits) throws IOException {
+ this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE);
}
- public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
- super(hiveScan);
+ public HiveDrillNativeParquetScan(String userName,
+ List<SchemaPath> columns,
+ HiveStoragePlugin hiveStoragePlugin,
+ List<LogicalInputSplit> logicalInputSplits,
+ LogicalExpression filter) throws IOException {
+ super(userName, columns, new ArrayList<>(), filter);
+
+ this.hiveStoragePlugin = hiveStoragePlugin;
+ this.hivePartitionHolder = new HivePartitionHolder();
+
+ for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
+ Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator();
+ // logical input split contains list of splits by files
+ // we need to read path of only one to get file path
+ assert iterator.hasNext();
+ InputSplit split = iterator.next();
+ assert split instanceof FileSplit;
+ FileSplit fileSplit = (FileSplit) split;
+ Path finalPath = fileSplit.getPath();
+ String pathString = Path.getPathWithoutSchemeAndAuthority(finalPath).toString();
+ entries.add(new ReadEntryWithPath(pathString));
+
+ // store partition values per path
+ Partition partition = logicalInputSplit.getPartition();
+ if (partition != null) {
+ hivePartitionHolder.add(pathString, partition.getValues());
+ }
+ }
+
+ init();
}
- @Override
- public ScanStats getScanStats() {
- final ScanStats nativeHiveScanStats = super.getScanStats();
+ private HiveDrillNativeParquetScan(HiveDrillNativeParquetScan that) {
+ super(that);
+ this.hiveStoragePlugin = that.hiveStoragePlugin;
+ this.hivePartitionHolder = that.hivePartitionHolder;
+ }
- // As Drill's native parquet record reader is faster and memory efficient. Divide the CPU cost
- // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes.
- return new ScanStats(
- nativeHiveScanStats.getGroupScanProperty(),
- nativeHiveScanStats.getRecordCount(),
- nativeHiveScanStats.getCpuCost()/getSerDeOverheadFactor(),
- nativeHiveScanStats.getDiskCost());
+ @JsonProperty
+ public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+ return hiveStoragePlugin.getConfig();
}
- @Override
- public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
- try {
- return new HiveDrillNativeParquetSubScan((HiveSubScan)super.getSpecificScan(minorFragmentId));
- } catch (IOException | ReflectiveOperationException e) {
- throw new ExecutionSetupException(e);
- }
+ @JsonProperty
+ public HivePartitionHolder getHivePartitionHolder() {
+ return hivePartitionHolder;
}
@Override
- public boolean isNativeReader() {
- return true;
+ public SubScan getSpecificScan(int minorFragmentId) {
+ List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
+ HivePartitionHolder subPartitionHolder = new HivePartitionHolder();
+ for (RowGroupReadEntry readEntry : readEntries) {
+ List<String> values = hivePartitionHolder.get(readEntry.getPath());
+ subPartitionHolder.add(readEntry.getPath(), values);
+ }
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
return new HiveDrillNativeParquetScan(this);
}
@Override
- public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
- return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, getStoragePlugin(), getColumns(), getMetadataProvider());
+ public HiveDrillNativeParquetScan clone(FileSelection selection) throws IOException {
+ HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this);
+ newScan.modifyFileSelection(selection);
+ newScan.init();
+ return newScan;
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
- final HiveDrillNativeParquetScan scan = new HiveDrillNativeParquetScan(this);
- scan.columns = columns;
- return scan;
+ HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this);
+ newScan.columns = columns;
+ return newScan;
}
@Override
public String toString() {
- final List<HivePartitionWrapper> partitions = getHiveReadEntry().getHivePartitionWrappers();
- int numPartitions = partitions == null ? 0 : partitions.size();
- return "HiveDrillNativeParquetScan [table=" + getHiveReadEntry().getHiveTableWrapper()
- + ", columns=" + getColumns()
- + ", numPartitions=" + numPartitions
- + ", partitions= " + partitions
- + ", inputDirectories=" + getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]";
+ StringBuilder builder = new StringBuilder();
+ builder.append("HiveDrillNativeParquetScan [");
+ builder.append("entries=").append(entries);
+ builder.append(", numFiles=").append(getEntries().size());
+ builder.append(", numRowGroups=").append(rowGroupInfos.size());
+
+ String filterString = getFilterString();
+ if (!filterString.isEmpty()) {
+ builder.append(", filter=").append(filterString);
+ }
+
+ builder.append(", columns=").append(columns);
+ builder.append("]");
+
+ return builder.toString();
}
+
+ @Override
+ protected void initInternal() throws IOException {
+ ParquetFormatConfig formatConfig = new ParquetFormatConfig();
+ Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
+ for (ReadEntryWithPath entry : entries) {
+ Path path = new Path(entry.getPath());
+ Configuration conf = new ProjectionPusher().pushProjectionsAndFilters(
+ new JobConf(hiveStoragePlugin.getHiveConf()),
+ path.getParent());
+ FileSystem fs = path.getFileSystem(conf);
+ fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs);
+ }
+ parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, formatConfig);
+ }
+
+ @Override
+ protected Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits() {
+ return hiveStoragePlugin.getContext().getBits();
+ }
+
+ @Override
+ protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException {
+ FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false);
+ return clone(newSelection);
+ }
+
+ @Override
+ protected boolean supportsFileImplicitColumns() {
+ return false;
+ }
+
+ @Override
+ protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) {
+ return hivePartitionHolder.get(rowGroupInfo.getPath());
+ }
+
}