diff options
author | vkorukanti <venki.korukanti@gmail.com> | 2015-01-19 09:27:20 -0800 |
---|---|---|
committer | vkorukanti <venki.korukanti@gmail.com> | 2015-01-19 22:09:05 -0800 |
commit | 49b60caa43a4277916525a2bc5b47d6ed636e383 (patch) | |
tree | 515fa2fb1f6e7b59f757dc796bba25e7942fda45 | |
parent | 7a136ae009d7c81f6d3532515028652a2da82806 (diff) |
DRILL-2034: Use the correct schema when creating FileSystem object in HiveScan
7 files changed, 166 insertions, 77 deletions
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index ddbc1003c..35db8ef31 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; @@ -129,51 +130,11 @@ public class HiveScan extends AbstractGroupScan { Table table = hiveReadEntry.getTable(); if (partitions == null || partitions.size() == 0) { Properties properties = MetaStoreUtils.getTableMetadata(table); - JobConf job = new JobConf(); - for (Object obj : properties.keySet()) { - job.set((String) obj, (String) properties.get(obj)); - } - for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { - job.set(entry.getKey(), entry.getValue()); - } - InputFormat<?, ?> format = (InputFormat<?, ?>) - Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance(); - job.setInputFormat(format.getClass()); - Path path = new Path(table.getSd().getLocation()); - FileSystem fs = FileSystem.get(job); - if (fs.exists(path)) { - FileInputFormat.addInputPath(job, path); - format = job.getInputFormat(); - for (InputSplit split : format.getSplits(job, 1)) { - inputSplits.add(split); - } - } - for (InputSplit split : inputSplits) { - partitionMap.put(split, null); - } + splitInput(properties, table.getSd(), null); } else { for (Partition partition : partitions) { Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table); - JobConf job = new JobConf(); - for (Object obj : properties.keySet()) { - job.set((String) obj, (String) properties.get(obj)); - } - for(Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { - job.set(entry.getKey(), entry.getValue()); - } - InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance(); - job.setInputFormat(format.getClass()); - Path path = new Path(partition.getSd().getLocation()); - FileSystem fs = FileSystem.get(job); - if (fs.exists(path)) { - FileInputFormat.addInputPath(job, path); - format = job.getInputFormat(); - InputSplit[] splits = format.getSplits(job, 1); - for (InputSplit split : splits) { - inputSplits.add(split); - partitionMap.put(split, partition); - } - } + splitInput(properties, partition.getSd(), partition); } } } catch (ReflectiveOperationException | IOException e) { @@ -181,6 +142,34 @@ public class HiveScan extends AbstractGroupScan { } } + /* Split the input given in StorageDescriptor */ + private void splitInput(Properties properties, StorageDescriptor sd, Partition partition) + throws ReflectiveOperationException, IOException { + JobConf job = new JobConf(); + for (Object obj : properties.keySet()) { + job.set((String) obj, (String) properties.get(obj)); + } + for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { + job.set(entry.getKey(), entry.getValue()); + } + InputFormat<?, ?> format = (InputFormat<?, ?>) + Class.forName(sd.getInputFormat()).getConstructor().newInstance(); + job.setInputFormat(format.getClass()); + Path path = new Path(sd.getLocation()); + FileSystem fs = path.getFileSystem(job); + + // Use new JobConf that has FS configuration + JobConf jobWithFsConf = new JobConf(fs.getConf()); + if (fs.exists(path)) { + FileInputFormat.addInputPath(jobWithFsConf, path); + format = jobWithFsConf.getInputFormat(); + for (InputSplit split : format.getSplits(jobWithFsConf, 1)) { + inputSplits.add(split); + partitionMap.put(split, partition); + } + } + } + @Override public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) { mappings = Lists.newArrayList(); diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java index b75f7d0ab..7353e050e 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java @@ -17,19 +17,11 @@ */ package org.apache.drill.exec; -import org.apache.drill.PlanTestBase; -import org.apache.drill.exec.store.hive.HiveTestDataGenerator; -import org.junit.BeforeClass; +import org.apache.drill.exec.hive.HiveTestBase; import org.junit.Ignore; import org.junit.Test; -public class TestHivePartitionPruning extends PlanTestBase { - - @BeforeClass - public static void generateHive() throws Exception{ - new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage()); - } - +public class TestHivePartitionPruning extends HiveTestBase { //Currently we do not have a good way to test plans so using a crude string comparison @Test public void testSimplePartitionFilter() throws Exception { diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java index 4479c8b30..0246f5431 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java @@ -19,18 +19,10 @@ package org.apache.drill.exec; import static org.junit.Assert.assertEquals; -import org.apache.drill.PlanTestBase; -import org.apache.drill.exec.store.hive.HiveTestDataGenerator; -import org.junit.BeforeClass; +import org.apache.drill.exec.hive.HiveTestBase; import org.junit.Test; -public class TestHiveProjectPushDown extends PlanTestBase { - - @BeforeClass - public static void generateHive() throws Exception{ - new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage()); - } - +public class TestHiveProjectPushDown extends HiveTestBase { private void testHelper(String query, String expectedColNamesInPlan, int expectedRecordCount)throws Exception { testPhysicalPlan(query, expectedColNamesInPlan); diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java index c6efcad8c..9ef766fe2 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java @@ -21,19 +21,12 @@ import static org.junit.Assert.assertTrue; import java.util.List; -import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.hive.HiveTestBase; import org.apache.drill.exec.rpc.user.QueryResultBatch; -import org.apache.drill.exec.store.hive.HiveTestDataGenerator; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -public class TestSampleHiveUDFs extends BaseTestQuery { - - @BeforeClass - public static void addStoragePlugins() throws Exception{ - new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage()); - } +public class TestSampleHiveUDFs extends HiveTestBase { private void helper(String query, String expected) throws Exception { List<QueryResultBatch> results = testSqlWithResults(query); diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java new file mode 100644 index 000000000..7e3b6c8b2 --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.hive; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.store.hive.HiveTestDataGenerator; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * Base class for Hive test. Takes care of generating and adding Hive test plugin before tests and deleting the + * plugin after tests. + */ +public class HiveTestBase extends PlanTestBase { + protected static HiveTestDataGenerator hiveTest; + + @BeforeClass + public static void generateHive() throws Exception{ + hiveTest = new HiveTestDataGenerator(bit.getContext().getStorage()); + hiveTest.createAndAddHiveTestPlugin(); + } + + @AfterClass + public static void cleanupHiveTestData() throws Exception{ + if (hiveTest != null) { + hiveTest.deleteHiveTestPlugin(); + } + } +} diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java new file mode 100644 index 000000000..a76128f6b --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.hive; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.FileSystem; +import org.junit.Test; + +public class TestHiveStorage extends HiveTestBase { + @Test + public void testQueryingTablesInNonDefaultFS() throws Exception { + // Update the default FS settings in Hive test storage plugin to non-local FS + hiveTest.updatePluginConfig(ImmutableMap.of(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9001")); + + testBuilder() + .sqlQuery("SELECT * FROM hive.`default`.kv LIMIT 1") + .unOrdered() + .baselineColumns("key", "value") + .baselineValues(1, " key_1") + .build() + .run(); + } +} diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index c55b71471..07587916a 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -26,6 +26,7 @@ import java.sql.Timestamp; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; @@ -40,13 +41,19 @@ public class HiveTestDataGenerator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveTestDataGenerator.class); static int RETRIES = 5; + private static final String HIVE_TEST_PLUGIN_NAME = "hive"; private Driver hiveDriver = null; private static final String DB_DIR = "/tmp/drill_hive_db"; private static final String WH_DIR = "/tmp/drill_hive_wh"; + private final StoragePluginRegistry pluginRegistry; - public static void main(String[] args) throws Exception { - HiveTestDataGenerator htd = new HiveTestDataGenerator(); - htd.generateTestData(); + public HiveTestDataGenerator(StoragePluginRegistry pluginRegistry) { + this.pluginRegistry = pluginRegistry; + } + + // TODO: Remove this once hive related tests in exec/jdbc are moved to contrib/storage-hive/core module + public HiveTestDataGenerator() { + this(null); } private void cleanDir(String dir) throws IOException{ @@ -57,7 +64,11 @@ public class HiveTestDataGenerator { } } - public void createAndAddHiveTestPlugin(StoragePluginRegistry pluginRegistry) throws Exception { + /** + * Create a Hive test storage plugin and add it to the plugin registry. + * @throws Exception + */ + public void createAndAddHiveTestPlugin() throws Exception { // generate test tables and data generateTestData(); @@ -66,13 +77,43 @@ public class HiveTestDataGenerator { config.put("hive.metastore.uris", ""); config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", DB_DIR)); config.put("hive.metastore.warehouse.dir", WH_DIR); - config.put("fs.default.name", "file:///"); + config.put(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config); pluginConfig.setEnabled(true); - pluginRegistry.createOrUpdate("hive", pluginConfig, true); + pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, pluginConfig, true); + } + + /** + * Update the current HiveStoragePlugin with new config. + * + * @param configOverride + * @throws DrillException if fails to update or no plugin exists. + */ + public void updatePluginConfig(Map<String, String> configOverride) + throws DrillException { + HiveStoragePlugin storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(HIVE_TEST_PLUGIN_NAME); + if (storagePlugin == null) { + throw new DrillException( + "Hive test storage plugin doesn't exist. Add a plugin using createAndAddHiveTestPlugin()"); + } + + HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig(); + newPluginConfig.getHiveConfigOverride().putAll(configOverride); + + pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true); + } + + /** + * Delete the Hive test plugin from registry. + */ + public void deleteHiveTestPlugin() { + pluginRegistry.deletePlugin(HIVE_TEST_PLUGIN_NAME); } + // TODO: Make this method private once hive related tests in exec/jdbc are moved to contrib/storage-hive/core module. + // Tests in exec/jdbc just need the Hive metastore and test data and don't need adding storage plugin to registry. public void generateTestData() throws Exception { // remove data from previous runs. |