aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvkorukanti <venki.korukanti@gmail.com>2015-01-19 09:27:20 -0800
committervkorukanti <venki.korukanti@gmail.com>2015-01-19 22:09:05 -0800
commit49b60caa43a4277916525a2bc5b47d6ed636e383 (patch)
tree515fa2fb1f6e7b59f757dc796bba25e7942fda45
parent7a136ae009d7c81f6d3532515028652a2da82806 (diff)
DRILL-2034: Use the correct schema when creating FileSystem object in HiveScan
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java73
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java12
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java12
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java11
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java44
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java38
-rw-r--r--contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java53
7 files changed, 166 insertions, 77 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index ddbc1003c..35db8ef31 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
@@ -129,51 +130,11 @@ public class HiveScan extends AbstractGroupScan {
Table table = hiveReadEntry.getTable();
if (partitions == null || partitions.size() == 0) {
Properties properties = MetaStoreUtils.getTableMetadata(table);
- JobConf job = new JobConf();
- for (Object obj : properties.keySet()) {
- job.set((String) obj, (String) properties.get(obj));
- }
- for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
- job.set(entry.getKey(), entry.getValue());
- }
- InputFormat<?, ?> format = (InputFormat<?, ?>)
- Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
- job.setInputFormat(format.getClass());
- Path path = new Path(table.getSd().getLocation());
- FileSystem fs = FileSystem.get(job);
- if (fs.exists(path)) {
- FileInputFormat.addInputPath(job, path);
- format = job.getInputFormat();
- for (InputSplit split : format.getSplits(job, 1)) {
- inputSplits.add(split);
- }
- }
- for (InputSplit split : inputSplits) {
- partitionMap.put(split, null);
- }
+ splitInput(properties, table.getSd(), null);
} else {
for (Partition partition : partitions) {
Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
- JobConf job = new JobConf();
- for (Object obj : properties.keySet()) {
- job.set((String) obj, (String) properties.get(obj));
- }
- for(Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
- job.set(entry.getKey(), entry.getValue());
- }
- InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
- job.setInputFormat(format.getClass());
- Path path = new Path(partition.getSd().getLocation());
- FileSystem fs = FileSystem.get(job);
- if (fs.exists(path)) {
- FileInputFormat.addInputPath(job, path);
- format = job.getInputFormat();
- InputSplit[] splits = format.getSplits(job, 1);
- for (InputSplit split : splits) {
- inputSplits.add(split);
- partitionMap.put(split, partition);
- }
- }
+ splitInput(properties, partition.getSd(), partition);
}
}
} catch (ReflectiveOperationException | IOException e) {
@@ -181,6 +142,34 @@ public class HiveScan extends AbstractGroupScan {
}
}
+ /* Split the input given in StorageDescriptor */
+ private void splitInput(Properties properties, StorageDescriptor sd, Partition partition)
+ throws ReflectiveOperationException, IOException {
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
+ job.set(entry.getKey(), entry.getValue());
+ }
+ InputFormat<?, ?> format = (InputFormat<?, ?>)
+ Class.forName(sd.getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(format.getClass());
+ Path path = new Path(sd.getLocation());
+ FileSystem fs = path.getFileSystem(job);
+
+ // Use new JobConf that has FS configuration
+ JobConf jobWithFsConf = new JobConf(fs.getConf());
+ if (fs.exists(path)) {
+ FileInputFormat.addInputPath(jobWithFsConf, path);
+ format = jobWithFsConf.getInputFormat();
+ for (InputSplit split : format.getSplits(jobWithFsConf, 1)) {
+ inputSplits.add(split);
+ partitionMap.put(split, partition);
+ }
+ }
+ }
+
@Override
public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
mappings = Lists.newArrayList();
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index b75f7d0ab..7353e050e 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -17,19 +17,11 @@
*/
package org.apache.drill.exec;
-import org.apache.drill.PlanTestBase;
-import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
-import org.junit.BeforeClass;
+import org.apache.drill.exec.hive.HiveTestBase;
import org.junit.Ignore;
import org.junit.Test;
-public class TestHivePartitionPruning extends PlanTestBase {
-
- @BeforeClass
- public static void generateHive() throws Exception{
- new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage());
- }
-
+public class TestHivePartitionPruning extends HiveTestBase {
//Currently we do not have a good way to test plans so using a crude string comparison
@Test
public void testSimplePartitionFilter() throws Exception {
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
index 4479c8b30..0246f5431 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
@@ -19,18 +19,10 @@ package org.apache.drill.exec;
import static org.junit.Assert.assertEquals;
-import org.apache.drill.PlanTestBase;
-import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
-import org.junit.BeforeClass;
+import org.apache.drill.exec.hive.HiveTestBase;
import org.junit.Test;
-public class TestHiveProjectPushDown extends PlanTestBase {
-
- @BeforeClass
- public static void generateHive() throws Exception{
- new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage());
- }
-
+public class TestHiveProjectPushDown extends HiveTestBase {
private void testHelper(String query, String expectedColNamesInPlan, int expectedRecordCount)throws Exception {
testPhysicalPlan(query, expectedColNamesInPlan);
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
index c6efcad8c..9ef766fe2 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
@@ -21,19 +21,12 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
-import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.hive.HiveTestBase;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
-import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-public class TestSampleHiveUDFs extends BaseTestQuery {
-
- @BeforeClass
- public static void addStoragePlugins() throws Exception{
- new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage());
- }
+public class TestSampleHiveUDFs extends HiveTestBase {
private void helper(String query, String expected) throws Exception {
List<QueryResultBatch> results = testSqlWithResults(query);
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
new file mode 100644
index 000000000..7e3b6c8b2
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Base class for Hive test. Takes care of generating and adding Hive test plugin before tests and deleting the
+ * plugin after tests.
+ */
+public class HiveTestBase extends PlanTestBase {
+ protected static HiveTestDataGenerator hiveTest;
+
+ @BeforeClass
+ public static void generateHive() throws Exception{
+ hiveTest = new HiveTestDataGenerator(bit.getContext().getStorage());
+ hiveTest.createAndAddHiveTestPlugin();
+ }
+
+ @AfterClass
+ public static void cleanupHiveTestData() throws Exception{
+ if (hiveTest != null) {
+ hiveTest.deleteHiveTestPlugin();
+ }
+ }
+}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
new file mode 100644
index 000000000..a76128f6b
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+
+public class TestHiveStorage extends HiveTestBase {
+ @Test
+ public void testQueryingTablesInNonDefaultFS() throws Exception {
+ // Update the default FS settings in Hive test storage plugin to non-local FS
+ hiveTest.updatePluginConfig(ImmutableMap.of(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9001"));
+
+ testBuilder()
+ .sqlQuery("SELECT * FROM hive.`default`.kv LIMIT 1")
+ .unOrdered()
+ .baselineColumns("key", "value")
+ .baselineValues(1, " key_1")
+ .build()
+ .run();
+ }
+}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index c55b71471..07587916a 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -26,6 +26,7 @@ import java.sql.Timestamp;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -40,13 +41,19 @@ public class HiveTestDataGenerator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveTestDataGenerator.class);
static int RETRIES = 5;
+ private static final String HIVE_TEST_PLUGIN_NAME = "hive";
private Driver hiveDriver = null;
private static final String DB_DIR = "/tmp/drill_hive_db";
private static final String WH_DIR = "/tmp/drill_hive_wh";
+ private final StoragePluginRegistry pluginRegistry;
- public static void main(String[] args) throws Exception {
- HiveTestDataGenerator htd = new HiveTestDataGenerator();
- htd.generateTestData();
+ public HiveTestDataGenerator(StoragePluginRegistry pluginRegistry) {
+ this.pluginRegistry = pluginRegistry;
+ }
+
+ // TODO: Remove this once hive related tests in exec/jdbc are moved to contrib/storage-hive/core module
+ public HiveTestDataGenerator() {
+ this(null);
}
private void cleanDir(String dir) throws IOException{
@@ -57,7 +64,11 @@ public class HiveTestDataGenerator {
}
}
- public void createAndAddHiveTestPlugin(StoragePluginRegistry pluginRegistry) throws Exception {
+ /**
+ * Create a Hive test storage plugin and add it to the plugin registry.
+ * @throws Exception
+ */
+ public void createAndAddHiveTestPlugin() throws Exception {
// generate test tables and data
generateTestData();
@@ -66,13 +77,43 @@ public class HiveTestDataGenerator {
config.put("hive.metastore.uris", "");
config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", DB_DIR));
config.put("hive.metastore.warehouse.dir", WH_DIR);
- config.put("fs.default.name", "file:///");
+ config.put(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+
HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config);
pluginConfig.setEnabled(true);
- pluginRegistry.createOrUpdate("hive", pluginConfig, true);
+ pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, pluginConfig, true);
+ }
+
+ /**
+ * Update the current HiveStoragePlugin with new config.
+ *
+ * @param configOverride
+ * @throws DrillException if fails to update or no plugin exists.
+ */
+ public void updatePluginConfig(Map<String, String> configOverride)
+ throws DrillException {
+ HiveStoragePlugin storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(HIVE_TEST_PLUGIN_NAME);
+ if (storagePlugin == null) {
+ throw new DrillException(
+ "Hive test storage plugin doesn't exist. Add a plugin using createAndAddHiveTestPlugin()");
+ }
+
+ HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
+ newPluginConfig.getHiveConfigOverride().putAll(configOverride);
+
+ pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true);
+ }
+
+ /**
+ * Delete the Hive test plugin from registry.
+ */
+ public void deleteHiveTestPlugin() {
+ pluginRegistry.deletePlugin(HIVE_TEST_PLUGIN_NAME);
}
+ // TODO: Make this method private once hive related tests in exec/jdbc are moved to contrib/storage-hive/core module.
+ // Tests in exec/jdbc just need the Hive metastore and test data and don't need adding storage plugin to registry.
public void generateTestData() throws Exception {
// remove data from previous runs.