diff options
Diffstat (limited to 'exec/java-exec/src')
3 files changed, 46 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index e72f4a8b1..b49442785 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -47,6 +47,8 @@ import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MagicString; import org.apache.drill.exec.store.mock.MockStorageEngine; +import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata; +import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -206,12 +208,33 @@ public class ParquetFormatPlugin implements FormatPlugin{ // TODO: we only check the first file for directory reading. This is because if(selection.containsDirectories(fs)){ if(isDirReadable(fs, selection.getFirstPath(fs))){ - return new FormatSelection(plugin.getConfig(), selection.minusDirectories(fs)); + return new FormatSelection(plugin.getConfig(), expandSelection(fs, selection)); } } return super.isReadable(fs, selection); } + private FileSelection expandSelection(DrillFileSystem fs, FileSelection selection) throws IOException { + if (metaDataFileExists(fs, selection.getFirstPath(fs))) { + ParquetTableMetadata_v1 metadata = Metadata.getParquetTableMetadata(fs, getMetadataPath(selection.getFirstPath(fs)).toString()); + List<String> fileNames = Lists.newArrayList(); + for (ParquetFileMetadata file : metadata.files) { + fileNames.add(file.path); + } + return new FileSelection(fileNames, true); + } else { + return selection.minusDirectories(fs); + } + } + + private Path getMetadataPath(FileStatus dir) { + return new Path(dir.getPath(), Metadata.METADATA_FILENAME); + } + + private boolean metaDataFileExists(FileSystem fs, FileStatus dir) throws IOException { + return fs.exists(getMetadataPath(dir)); + } + boolean isDirReadable(DrillFileSystem fs, FileStatus dir) { Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE); try { @@ -219,7 +242,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ return true; } else { - if (fs.exists(new Path(dir.getPath(), Metadata.METADATA_FILENAME))) { + if (metaDataFileExists(fs, dir)) { return true; } PathFilter filter = new DrillPathFilter(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a44daf2a5..8f68acc57 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -114,6 +114,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private final DrillFileSystem fs; private final String selectionRoot; + private boolean usedMetadataCache = false; private List<EndpointAffinity> endpointAffinities; private List<SchemaPath> columns; private ListMultimap<Integer, RowGroupInfo> mappings; @@ -199,6 +200,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { this.columnTypeMap = that.columnTypeMap == null ? null : new HashMap(that.columnTypeMap); this.partitionValueMap = that.partitionValueMap == null ? null : new HashMap(that.partitionValueMap); this.fileSet = that.fileSet == null ? null : new HashSet(that.fileSet); + this.usedMetadataCache = that.usedMetadataCache; } @@ -490,25 +492,33 @@ public class ParquetGroupScan extends AbstractFileGroupScan { Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); Path metaPath = null; if (fs.isDirectory(p)) { - // Using the metadata file makes sense when querying a directory; otherwise - // if querying a single file we can look up the metadata directly from the file +// Using the metadata file makes sense when querying a directory; otherwise +// if querying a single file we can look up the metadata directly from the file metaPath = new Path(p, Metadata.METADATA_FILENAME); } if (metaPath != null && fs.exists(metaPath)) { + usedMetadataCache = true; parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString()); } else { + usedMetadataCache = false; parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString()); } } else { Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); Path metaPath = new Path(p, Metadata.METADATA_FILENAME); - if (fs.exists(metaPath) && fileSet != null) { - parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString())); + if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) { + usedMetadataCache = true; + if (fileSet != null) { + parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString())); + } else { + parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString()); + } } else { fileStatuses = Lists.newArrayList(); for (ReadEntryWithPath entry : entries) { getFiles(entry.getPath(), fileStatuses); } + usedMetadataCache = false; parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses); } } @@ -716,6 +726,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return "ParquetGroupScan [entries=" + entries + ", selectionRoot=" + selectionRoot + ", numFiles=" + getEntries().size() + + ", usedMetadataFile=" + usedMetadataCache + ", columns=" + columns + "]"; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java index 12158c979..4fc7ea887 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java @@ -34,7 +34,7 @@ import java.nio.file.Files; import static org.junit.Assert.assertEquals; -public class TestParquetMetadataCache extends BaseTestQuery { +public class TestParquetMetadataCache extends PlanTestBase { private static final String WORKING_PATH = TestTools.getWorkingPath(); private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; private static final String tableName = "parquetTable"; @@ -71,10 +71,13 @@ public class TestParquetMetadataCache extends BaseTestQuery { String tableName = "nation_ctas"; test("use dfs_test.tmp"); test(String.format("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName)); + test(String.format("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName)); test(String.format("refresh table metadata %s", tableName)); checkForMetadataFile(tableName); - int rowCount = testSql(String.format("select * from %s", tableName)); - Assert.assertEquals(25, rowCount); + String query = String.format("select * from %s", tableName); + int rowCount = testSql(query); + Assert.assertEquals(50, rowCount); + testPlanMatchingPatterns(query, new String[] { "usedMetadataFile=true" }, new String[]{}); } @Test |