diff options
author | Bohdan Kazydub <bohdan.kazydub@gmail.com> | 2018-12-14 19:42:51 +0200 |
---|---|---|
committer | Gautam Parai <gparai@apache.org> | 2019-01-03 16:35:30 -0800 |
commit | a9331361c72d47c98ae16087e865bdf61eb01d96 (patch) | |
tree | 21ea84d195020b4ae6e73e20dab9eb18c95195ff | |
parent | f687da853d8a35c3b34d9efd05f85bc37d69d14d (diff) |
DRILL-6894: CTAS and CTTAS are not working on S3 storage when cache is disabled
- provided JsonRecordWriter, ParquetRecordWriter and DrillTextRecordWriter with file system configuration
closes #1576
6 files changed, 15 insertions, 27 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 721e80002..11dc20421 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -33,13 +33,11 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -72,20 +70,17 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { Map<String, String> options = new HashMap<>(); options.put("location", writer.getLocation()); - FragmentHandle handle = context.getHandle(); String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); options.put("prefix", fragmentId); - options.put("separator", " "); - options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection()); - options.put("extension", "json"); options.put("extended", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES))); options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY))); options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS))); options.put("enableNanInf", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR))); - RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy()); + + RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy(), getFsConf()); recordWriter.init(options); return recordWriter; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java index 9e6aaf8d9..2e80b3ffb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java @@ -64,8 +64,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr // Record write status private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called - public JsonRecordWriter(StorageStrategy storageStrategy){ + private Configuration fsConf; + + public JsonRecordWriter(StorageStrategy storageStrategy, Configuration fsConf) { this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy; + this.fsConf = new Configuration(fsConf); } @Override @@ -78,9 +81,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls")); final boolean uglify = Boolean.parseBoolean(writerOptions.get("uglify")); - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); - this.fs = FileSystem.get(conf); + this.fs = FileSystem.get(fsConf); Path fileName = new Path(location, prefix + "_" + index + "." + extension); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index bc129ae1d..2ac24d8d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; import org.apache.drill.exec.store.dfs.easy.EasyWriter; @@ -50,7 +49,6 @@ import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.drill.exec.store.text.DrillTextRecordReader; import org.apache.drill.exec.store.text.DrillTextRecordWriter; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; @@ -117,17 +115,14 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm final Map<String, String> options = new HashMap<>(); options.put("location", writer.getLocation()); - FragmentHandle handle = context.getHandle(); String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); options.put("prefix", fragmentId); - options.put("separator", getConfig().getFieldDelimiterAsString()); - options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection()); - options.put("extension", getConfig().getExtensions().get(0)); - RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy()); + RecordWriter recordWriter = new DrillTextRecordWriter( + context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf()); recordWriter.init(options); return recordWriter; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 2c409963a..f46cc1cf2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -47,7 +47,6 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.BasicFormatMatcher; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.util.DrillFileSystemUtil; @@ -140,8 +139,6 @@ public class ParquetFormatPlugin implements FormatPlugin { String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); options.put("prefix", fragmentId); - options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection()); - options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString()); options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK, context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 45233c4da..5a64f4070 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -132,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION)); this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.DEFAULT : writer.getStorageStrategy(); this.cleanUpLocations = Lists.newArrayList(); + this.conf = new Configuration(writer.getFormatPlugin().getFsConf()); } @Override @@ -139,8 +140,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { this.location = writerOptions.get("location"); this.prefix = writerOptions.get("prefix"); - conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); fs = FileSystem.get(conf); blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE)); pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java index 7b7c47fe4..83a00bd17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java @@ -56,9 +56,12 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called private StringBuilder currentRecord; // contains the current record separated by field delimiter - public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) { + private Configuration fsConf; + + public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) { super(allocator); this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy; + this.fsConf = new Configuration(fsConf); } @Override @@ -68,9 +71,7 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { this.fieldDelimiter = writerOptions.get("separator"); this.extension = writerOptions.get("extension"); - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); - this.fs = FileSystem.get(conf); + this.fs = FileSystem.get(fsConf); this.currentRecord = new StringBuilder(); this.index = 0; |