diff options
Diffstat (limited to 'contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java')
-rw-r--r-- | contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java | 204 |
1 files changed, 157 insertions, 47 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java index 202bd435e..03a80d338 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java @@ -21,94 +21,204 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.ReadEntryWithPath; +import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit; +import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan; +import org.apache.drill.exec.store.parquet.RowGroupReadEntry; +import org.apache.drill.exec.store.parquet.metadata.Metadata; +import org.apache.drill.exec.store.parquet.ParquetFormatConfig; +import org.apache.drill.exec.store.parquet.RowGroupInfo; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; -/** - * Extension of {@link HiveScan} which support reading Hive tables using Drill's native parquet reader. - */ @JsonTypeName("hive-drill-native-parquet-scan") -public class HiveDrillNativeParquetScan extends HiveScan { +public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { + + private final HiveStoragePlugin hiveStoragePlugin; + private HivePartitionHolder hivePartitionHolder; @JsonCreator - public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName, - @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, + public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry, + @JsonProperty("userName") String userName, @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") List<SchemaPath> columns, - @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { - super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, pluginRegistry); + @JsonProperty("entries") List<ReadEntryWithPath> entries, + @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder, + @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException { + super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter); + this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig); + this.hivePartitionHolder = hivePartitionHolder; + + init(); } - public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin hiveStoragePlugin, - List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException { - super(userName, hiveReadEntry, hiveStoragePlugin, columns, metadataProvider); + public HiveDrillNativeParquetScan(String userName, + List<SchemaPath> columns, + HiveStoragePlugin hiveStoragePlugin, + List<LogicalInputSplit> logicalInputSplits) throws IOException { + this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE); } - public HiveDrillNativeParquetScan(final HiveScan hiveScan) { - super(hiveScan); + public HiveDrillNativeParquetScan(String userName, + List<SchemaPath> columns, + HiveStoragePlugin hiveStoragePlugin, + List<LogicalInputSplit> logicalInputSplits, + LogicalExpression filter) throws IOException { + super(userName, columns, new ArrayList<>(), filter); + + this.hiveStoragePlugin = hiveStoragePlugin; + this.hivePartitionHolder = new HivePartitionHolder(); + + for (LogicalInputSplit logicalInputSplit : logicalInputSplits) { + Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator(); + // logical input split contains list of splits by files + // we need to read path of only one to get file path + assert iterator.hasNext(); + InputSplit split = iterator.next(); + assert split instanceof FileSplit; + FileSplit fileSplit = (FileSplit) split; + Path finalPath = fileSplit.getPath(); + String pathString = Path.getPathWithoutSchemeAndAuthority(finalPath).toString(); + entries.add(new ReadEntryWithPath(pathString)); + + // store partition values per path + Partition partition = logicalInputSplit.getPartition(); + if (partition != null) { + hivePartitionHolder.add(pathString, partition.getValues()); + } + } + + init(); } - @Override - public ScanStats getScanStats() { - final ScanStats nativeHiveScanStats = super.getScanStats(); + private HiveDrillNativeParquetScan(HiveDrillNativeParquetScan that) { + super(that); + this.hiveStoragePlugin = that.hiveStoragePlugin; + this.hivePartitionHolder = that.hivePartitionHolder; + } - // As Drill's native parquet record reader is faster and memory efficient. Divide the CPU cost - // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes. - return new ScanStats( - nativeHiveScanStats.getGroupScanProperty(), - nativeHiveScanStats.getRecordCount(), - nativeHiveScanStats.getCpuCost()/getSerDeOverheadFactor(), - nativeHiveScanStats.getDiskCost()); + @JsonProperty + public HiveStoragePluginConfig getHiveStoragePluginConfig() { + return hiveStoragePlugin.getConfig(); } - @Override - public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { - try { - return new HiveDrillNativeParquetSubScan((HiveSubScan)super.getSpecificScan(minorFragmentId)); - } catch (IOException | ReflectiveOperationException e) { - throw new ExecutionSetupException(e); - } + @JsonProperty + public HivePartitionHolder getHivePartitionHolder() { + return hivePartitionHolder; } @Override - public boolean isNativeReader() { - return true; + public SubScan getSpecificScan(int minorFragmentId) { + List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId); + HivePartitionHolder subPartitionHolder = new HivePartitionHolder(); + for (RowGroupReadEntry readEntry : readEntries) { + List<String> values = hivePartitionHolder.get(readEntry.getPath()); + subPartitionHolder.add(readEntry.getPath(), values); + } + return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter); } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); return new HiveDrillNativeParquetScan(this); } @Override - public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, getStoragePlugin(), getColumns(), getMetadataProvider()); + public HiveDrillNativeParquetScan clone(FileSelection selection) throws IOException { + HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this); + newScan.modifyFileSelection(selection); + newScan.init(); + return newScan; } @Override public GroupScan clone(List<SchemaPath> columns) { - final HiveDrillNativeParquetScan scan = new HiveDrillNativeParquetScan(this); - scan.columns = columns; - return scan; + HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this); + newScan.columns = columns; + return newScan; } @Override public String toString() { - final List<HivePartitionWrapper> partitions = getHiveReadEntry().getHivePartitionWrappers(); - int numPartitions = partitions == null ? 0 : partitions.size(); - return "HiveDrillNativeParquetScan [table=" + getHiveReadEntry().getHiveTableWrapper() - + ", columns=" + getColumns() - + ", numPartitions=" + numPartitions - + ", partitions= " + partitions - + ", inputDirectories=" + getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]"; + StringBuilder builder = new StringBuilder(); + builder.append("HiveDrillNativeParquetScan ["); + builder.append("entries=").append(entries); + builder.append(", numFiles=").append(getEntries().size()); + builder.append(", numRowGroups=").append(rowGroupInfos.size()); + + String filterString = getFilterString(); + if (!filterString.isEmpty()) { + builder.append(", filter=").append(filterString); + } + + builder.append(", columns=").append(columns); + builder.append("]"); + + return builder.toString(); } + + @Override + protected void initInternal() throws IOException { + ParquetFormatConfig formatConfig = new ParquetFormatConfig(); + Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>(); + for (ReadEntryWithPath entry : entries) { + Path path = new Path(entry.getPath()); + Configuration conf = new ProjectionPusher().pushProjectionsAndFilters( + new JobConf(hiveStoragePlugin.getHiveConf()), + path.getParent()); + FileSystem fs = path.getFileSystem(conf); + fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs); + } + parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, formatConfig); + } + + @Override + protected Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits() { + return hiveStoragePlugin.getContext().getBits(); + } + + @Override + protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException { + FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false); + return clone(newSelection); + } + + @Override + protected boolean supportsFileImplicitColumns() { + return false; + } + + @Override + protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) { + return hivePartitionHolder.get(rowGroupInfo.getPath()); + } + } |