aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBohdan Kazydub <bohdan.kazydub@gmail.com>2018-12-14 19:42:51 +0200
committerGautam Parai <gparai@apache.org>2019-01-03 16:35:30 -0800
commita9331361c72d47c98ae16087e865bdf61eb01d96 (patch)
tree21ea84d195020b4ae6e73e20dab9eb18c95195ff
parentf687da853d8a35c3b34d9efd05f85bc37d69d14d (diff)
DRILL-6894: CTAS and CTTAS are not working on S3 storage when cache is disabled
- provided JsonRecordWriter, ParquetRecordWriter and DrillTextRecordWriter with file system configuration closes #1576
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java9
6 files changed, 15 insertions, 27 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 721e80002..11dc20421 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -33,13 +33,11 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -72,20 +70,17 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
Map<String, String> options = new HashMap<>();
options.put("location", writer.getLocation());
-
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
options.put("prefix", fragmentId);
-
options.put("separator", " ");
- options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
options.put("extension", "json");
options.put("extended", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
options.put("enableNanInf", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR)));
- RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
+
+ RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy(), getFsConf());
recordWriter.init(options);
return recordWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 9e6aaf8d9..2e80b3ffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -64,8 +64,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
// Record write status
private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
- public JsonRecordWriter(StorageStrategy storageStrategy){
+ private Configuration fsConf;
+
+ public JsonRecordWriter(StorageStrategy storageStrategy, Configuration fsConf) {
this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
+ this.fsConf = new Configuration(fsConf);
}
@Override
@@ -78,9 +81,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
final boolean uglify = Boolean.parseBoolean(writerOptions.get("uglify"));
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
- this.fs = FileSystem.get(conf);
+ this.fs = FileSystem.get(fsConf);
Path fileName = new Path(location, prefix + "_" + index + "." + extension);
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bc129ae1d..2ac24d8d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
@@ -50,7 +49,6 @@ import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -117,17 +115,14 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
final Map<String, String> options = new HashMap<>();
options.put("location", writer.getLocation());
-
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
options.put("prefix", fragmentId);
-
options.put("separator", getConfig().getFieldDelimiterAsString());
- options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
options.put("extension", getConfig().getExtensions().get(0));
- RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
+ RecordWriter recordWriter = new DrillTextRecordWriter(
+ context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
recordWriter.init(options);
return recordWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 2c409963a..f46cc1cf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -47,7 +47,6 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.util.DrillFileSystemUtil;
@@ -140,8 +139,6 @@ public class ParquetFormatPlugin implements FormatPlugin {
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
options.put("prefix", fragmentId);
- options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45233c4da..5a64f4070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -132,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.DEFAULT : writer.getStorageStrategy();
this.cleanUpLocations = Lists.newArrayList();
+ this.conf = new Configuration(writer.getFormatPlugin().getFsConf());
}
@Override
@@ -139,8 +140,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
this.location = writerOptions.get("location");
this.prefix = writerOptions.get("prefix");
- conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
fs = FileSystem.get(conf);
blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 7b7c47fe4..83a00bd17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -56,9 +56,12 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
private StringBuilder currentRecord; // contains the current record separated by field delimiter
- public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
+ private Configuration fsConf;
+
+ public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
super(allocator);
this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
+ this.fsConf = new Configuration(fsConf);
}
@Override
@@ -68,9 +71,7 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
this.fieldDelimiter = writerOptions.get("separator");
this.extension = writerOptions.get("extension");
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
- this.fs = FileSystem.get(conf);
+ this.fs = FileSystem.get(fsConf);
this.currentRecord = new StringBuilder();
this.index = 0;