aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java19
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java9
3 files changed, 46 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index e72f4a8b1..b49442785 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -47,6 +47,8 @@ import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -206,12 +208,33 @@ public class ParquetFormatPlugin implements FormatPlugin{
// TODO: we only check the first file for directory reading. This is because
if(selection.containsDirectories(fs)){
if(isDirReadable(fs, selection.getFirstPath(fs))){
- return new FormatSelection(plugin.getConfig(), selection.minusDirectories(fs));
+ return new FormatSelection(plugin.getConfig(), expandSelection(fs, selection));
}
}
return super.isReadable(fs, selection);
}
+ private FileSelection expandSelection(DrillFileSystem fs, FileSelection selection) throws IOException {
+ if (metaDataFileExists(fs, selection.getFirstPath(fs))) {
+ ParquetTableMetadata_v1 metadata = Metadata.getParquetTableMetadata(fs, getMetadataPath(selection.getFirstPath(fs)).toString());
+ List<String> fileNames = Lists.newArrayList();
+ for (ParquetFileMetadata file : metadata.files) {
+ fileNames.add(file.path);
+ }
+ return new FileSelection(fileNames, true);
+ } else {
+ return selection.minusDirectories(fs);
+ }
+ }
+
+ private Path getMetadataPath(FileStatus dir) {
+ return new Path(dir.getPath(), Metadata.METADATA_FILENAME);
+ }
+
+ private boolean metaDataFileExists(FileSystem fs, FileStatus dir) throws IOException {
+ return fs.exists(getMetadataPath(dir));
+ }
+
boolean isDirReadable(DrillFileSystem fs, FileStatus dir) {
Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
try {
@@ -219,7 +242,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
return true;
} else {
- if (fs.exists(new Path(dir.getPath(), Metadata.METADATA_FILENAME))) {
+ if (metaDataFileExists(fs, dir)) {
return true;
}
PathFilter filter = new DrillPathFilter();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a44daf2a5..8f68acc57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -114,6 +114,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private final DrillFileSystem fs;
private final String selectionRoot;
+ private boolean usedMetadataCache = false;
private List<EndpointAffinity> endpointAffinities;
private List<SchemaPath> columns;
private ListMultimap<Integer, RowGroupInfo> mappings;
@@ -199,6 +200,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.columnTypeMap = that.columnTypeMap == null ? null : new HashMap(that.columnTypeMap);
this.partitionValueMap = that.partitionValueMap == null ? null : new HashMap(that.partitionValueMap);
this.fileSet = that.fileSet == null ? null : new HashSet(that.fileSet);
+ this.usedMetadataCache = that.usedMetadataCache;
}
@@ -490,25 +492,33 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
Path metaPath = null;
if (fs.isDirectory(p)) {
- // Using the metadata file makes sense when querying a directory; otherwise
- // if querying a single file we can look up the metadata directly from the file
+// Using the metadata file makes sense when querying a directory; otherwise
+// if querying a single file we can look up the metadata directly from the file
metaPath = new Path(p, Metadata.METADATA_FILENAME);
}
if (metaPath != null && fs.exists(metaPath)) {
+ usedMetadataCache = true;
parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
} else {
+ usedMetadataCache = false;
parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
}
} else {
Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
Path metaPath = new Path(p, Metadata.METADATA_FILENAME);
- if (fs.exists(metaPath) && fileSet != null) {
- parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
+ if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
+ usedMetadataCache = true;
+ if (fileSet != null) {
+ parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
+ } else {
+ parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+ }
} else {
fileStatuses = Lists.newArrayList();
for (ReadEntryWithPath entry : entries) {
getFiles(entry.getPath(), fileStatuses);
}
+ usedMetadataCache = false;
parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses);
}
}
@@ -716,6 +726,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return "ParquetGroupScan [entries=" + entries
+ ", selectionRoot=" + selectionRoot
+ ", numFiles=" + getEntries().size()
+ + ", usedMetadataFile=" + usedMetadataCache
+ ", columns=" + columns + "]";
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 12158c979..4fc7ea887 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -34,7 +34,7 @@ import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
-public class TestParquetMetadataCache extends BaseTestQuery {
+public class TestParquetMetadataCache extends PlanTestBase {
private static final String WORKING_PATH = TestTools.getWorkingPath();
private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
private static final String tableName = "parquetTable";
@@ -71,10 +71,13 @@ public class TestParquetMetadataCache extends BaseTestQuery {
String tableName = "nation_ctas";
test("use dfs_test.tmp");
test(String.format("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName));
+ test(String.format("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName));
test(String.format("refresh table metadata %s", tableName));
checkForMetadataFile(tableName);
- int rowCount = testSql(String.format("select * from %s", tableName));
- Assert.assertEquals(25, rowCount);
+ String query = String.format("select * from %s", tableName);
+ int rowCount = testSql(query);
+ Assert.assertEquals(50, rowCount);
+ testPlanMatchingPatterns(query, new String[] { "usedMetadataFile=true" }, new String[]{});
}
@Test