diff options
author | Alan Gates <gates@hortonworks.com> | 2016-11-07 15:39:54 -0800 |
---|---|---|
committer | Roman Shaposhnik <rvs@apache.org> | 2017-03-23 10:27:12 -0700 |
commit | c313795409472dfffda49a4ffcb6dc6c59f9c5a9 (patch) | |
tree | 7320774dfcb701dc3bdc2673c957ba1218bcf348 /bigtop-tests | |
parent | 241c8397215fe42e9e0b3881814b98071065325d (diff) |
Progress so far. Doesn't work yet, but committing to avoid another data loss.
(cherry picked from commit 18ee8453c11e7fd7c25af75e6c403753db11d5f5)
Diffstat (limited to 'bigtop-tests')
5 files changed, 359 insertions, 2 deletions
diff --git a/bigtop-tests/spec-tests/runtime/build.gradle b/bigtop-tests/spec-tests/runtime/build.gradle index 55055506..f0166c94 100644 --- a/bigtop-tests/spec-tests/runtime/build.gradle +++ b/bigtop-tests/spec-tests/runtime/build.gradle @@ -17,6 +17,8 @@ */ def junitVersion = '4.11' +apply plugin: 'java' + repositories { maven { url "http://conjars.org/repo/" @@ -31,8 +33,12 @@ dependencies { compile group: 'org.apache.hive', name: 'hive-common', version: '1.2.1' compile group: 'org.apache.thrift', name: 'libfb303', version: '0.9.3' compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3' - testCompile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.2' + compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.2' + compile group: 'org.apache.hive.hcatalog', name: 'hive-hcatalog-core', version: '1.2.1' testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.2' + compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-jobclient', version: '2.7.2' + testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-common', version: '2.7.2' + testCompile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.2' testCompile group: 'org.apache.hive', name: 'hive-exec', version: '1.2.1' if (System.env.HADOOP_CONF_DIR) testRuntime files(System.env.HADOOP_CONF_DIR) } diff --git a/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java b/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java new file mode 100644 index 00000000..4a733d67 --- /dev/null +++ b/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.odpi.specs.runtime.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; + +import java.io.IOException; +import java.net.URI; +import java.util.StringTokenizer; + +public class HCatalogMR extends Configured implements Tool { + private final static String INPUT_SCHEMA = "odpi.test.hcat.schema.input"; + private final static String OUTPUT_SCHEMA = "odpi.test.hcat.schema.output"; + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + args = new GenericOptionsParser(conf, args).getRemainingArgs(); + + String inputTable = args[0]; + String outputTable = args[1]; + String inputSchemaStr = args[2]; + String outputSchemaStr = args[3]; + + conf.set(INPUT_SCHEMA, inputSchemaStr); + conf.set(OUTPUT_SCHEMA, outputSchemaStr); + + Job job = new Job(conf, "odpi_hcat_test"); + HCatInputFormat.setInput(job, "default", inputTable); + + job.setInputFormatClass(HCatInputFormat.class); + job.setJarByClass(HCatalogMR.class); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(WritableComparable.class); + job.setOutputValueClass(HCatRecord.class); + HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null)); + HCatOutputFormat.setSchema(job, HCatSchemaUtils.getHCatSchema(outputSchemaStr)); + job.setOutputFormatClass(HCatOutputFormat.class); + + job.addCacheArchive(new URI("hdfs:/user/gates/hive-hcatalog-core-1.2.1.jar")); + job.addCacheArchive(new URI("hdfs:/user/gates/hive-metastore-1.2.1.jar")); + job.addCacheArchive(new URI("hdfs:/user/gates/hive-exec-1.2.1.jar")); + + return job.waitForCompletion(true) ? 0 : 1; + + + } + public static class Map extends Mapper<WritableComparable, + HCatRecord, Text, IntWritable> { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + private HCatSchema inputSchema = null; + + @Override + protected void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + if (inputSchema == null) { + inputSchema = + HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(INPUT_SCHEMA)); + } + String line = value.getString("line", inputSchema); + StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + context.write(word, one); + } + } + } + + public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> { + private HCatSchema outputSchema = null; + + @Override + protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws + IOException, InterruptedException { + if (outputSchema == null) { + outputSchema = + HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(OUTPUT_SCHEMA)); + } + int sum = 0; + for (IntWritable i : values) { + sum += i.get(); + } + HCatRecord output = new DefaultHCatRecord(2); + output.set("word", outputSchema, key); + output.set("count", outputSchema, sum); + context.write(null, output); + } + } + } diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java index f5cc3792..7512dabf 100644 --- a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java +++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java @@ -36,6 +36,9 @@ public class JdbcConnector { protected static final String LOCATION = "odpi.test.hive.location"; protected static final String METASTORE_URL = "odpi.test.hive.metastore.url"; protected static final String TEST_THRIFT = "odpi.test.hive.thrift.test"; + protected static final String TEST_HCATALOG = "odpi.test.hive.hcatalog.test"; + protected static final String HIVE_CONF_DIR = "odpi.test.hive.conf.dir"; + protected static final String HADOOP_CONF_DIR = "odpi.test.hadoop.conf.dir"; protected static Connection conn; diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java new file mode 100644 index 00000000..4b611319 --- /dev/null +++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.odpi.specs.runtime.hive; + +import org.apache.commons.exec.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.thrift.TException; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.StringTokenizer; + + +public class TestHCatalog { + + private static final Log LOG = LogFactory.getLog(TestHCatalog.class.getName()); + + private static IMetaStoreClient client = null; + private static HiveConf conf; + private static HCatSchema inputSchema; + private static HCatSchema outputSchema; + + private Random rand; + + @BeforeClass + public static void connect() throws MetaException { + if (JdbcConnector.testActive(JdbcConnector.TEST_HCATALOG, "Test HCatalog ")) { + String hiveConfDir = JdbcConnector.getProperty(JdbcConnector.HIVE_CONF_DIR, + "Hive conf directory "); + String hadoopConfDir = JdbcConnector.getProperty(JdbcConnector.HADOOP_CONF_DIR, + "Hadoop conf directory "); + conf = new HiveConf(); + String fileSep = System.getProperty("file.separator"); + conf.addResource(new Path(hadoopConfDir + fileSep + "core-site.xml")); + conf.addResource(new Path(hadoopConfDir + fileSep + "hdfs-site.xml")); + conf.addResource(new Path(hadoopConfDir + fileSep + "yarn-site.xml")); + conf.addResource(new Path(hadoopConfDir + fileSep + "mapred-site.xml")); + conf.addResource(new Path(hiveConfDir + fileSep + "hive-site.xml")); + client = new HiveMetaStoreClient(conf); + + } + } + + @Before + public void checkIfActive() { + Assume.assumeTrue(JdbcConnector.testActive(JdbcConnector.TEST_HCATALOG, "Test HCatalog ")); + rand = new Random(); + } + + @Test + public void hcatInputFormatOutputFormat() throws TException, IOException, ClassNotFoundException, + InterruptedException, URISyntaxException { + // Create a table to write to + final String inputTable = "odpi_hcat_input_table_" + rand.nextInt(Integer.MAX_VALUE); + SerDeInfo serde = new SerDeInfo("default_serde", + conf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE), new HashMap<String, String>()); + FieldSchema schema = new FieldSchema("line", "string", ""); + inputSchema = new HCatSchema(Collections.singletonList(new HCatFieldSchema(schema.getName(), + HCatFieldSchema.Type.STRING, schema.getComment()))); + StorageDescriptor sd = new StorageDescriptor(Collections.singletonList(schema), null, + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat", false, 0, serde, null, null, + new HashMap<String, String>()); + Table table = new Table(inputTable, "default", "me", 0, 0, 0, sd, null, + new HashMap<String, String>(), null, null, TableType.MANAGED_TABLE.toString()); + client.createTable(table); + + final String outputTable = "odpi_hcat_output_table_" + rand.nextInt(Integer.MAX_VALUE); + sd = new StorageDescriptor(Arrays.asList( + new FieldSchema("word", "string", ""), + new FieldSchema("count", "int", "")), + null, "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat", false, 0, serde, null, null, + new HashMap<String, String>()); + table = new Table(outputTable, "default", "me", 0, 0, 0, sd, null, + new HashMap<String, String>(), null, null, TableType.MANAGED_TABLE.toString()); + client.createTable(table); + outputSchema = new HCatSchema(Arrays.asList( + new HCatFieldSchema("word", HCatFieldSchema.Type.STRING, ""), + new HCatFieldSchema("count", HCatFieldSchema.Type.INT, ""))); + + // TODO Could I use HCatWriter hear and the reader to read it? + // Write some stuff into a file in the location of the table + table = client.getTable("default", inputTable); + String inputFile = table.getSd().getLocation() + "/input"; + /* + String inputFile = JdbcConnector.getProperty(JdbcConnector.LOCATION, + "Directory to write a file in ") + "/odpi_hcat_input_" + rand.nextInt(Integer.MAX_VALUE); + */ + Path inputPath = new Path(inputFile); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream out = fs.create(inputPath); + out.writeChars("Mary had a little lamb\n"); + out.writeChars("its fleece was white as snow\n"); + out.writeChars("and everywhere that Mary went\n"); + out.writeChars("the lamb was sure to go\n"); + out.close(); + + Map<String, String> results = HiveHelper.execCommand(new CommandLine("hadoop") + .addArgument("jar") + .addArgument("/Users/gates/git/bigtop/runtime-1.2.0-SNAPSHOT.jar") + .addArgument(HCatalogMR.class.getName()) + .addArgument(inputTable) + .addArgument(outputTable) + .addArgument(inputSchema.getSchemaAsTypeString()) + .addArgument(outputSchema.getSchemaAsTypeString())); + Assert.assertEquals("HCat job failed", 0, Integer.parseInt(results.get("exitValue"))); + + + + /* + Job job = new Job(conf, "odpi_hcat_test"); + HCatInputFormat.setInput(job, "default", inputTable); + + job.setInputFormatClass(HCatInputFormat.class); + job.setJarByClass(TestHCatalog.class); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(WritableComparable.class); + job.setOutputValueClass(HCatRecord.class); + HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null)); + HCatOutputFormat.setSchema(job, outputSchema); + job.setOutputFormatClass(HCatOutputFormat.class); + + job.addCacheArchive(new URI("hdfs:/user/gates/hive-hcatalog-core-1.2.1.jar")); + job.addCacheArchive(new URI("hdfs:/user/gates/hive-metastore-1.2.1.jar")); + job.addCacheArchive(new URI("hdfs:/user/gates/hive-exec-1.2.1.jar")); + + Assert.assertTrue(job.waitForCompletion(true)); + */ + + client.dropTable("default", inputTable); + client.dropTable("default", outputTable); + } + + /* + public static class Map extends Mapper<WritableComparable, + HCatRecord, Text, IntWritable> { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + @Override + protected void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + String line = value.getString("line", inputSchema); + StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + context.write(word, one); + } + } + } + + public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> { + @Override + protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws + IOException, InterruptedException { + int sum = 0; + for (IntWritable i : values) { + sum += i.get(); + } + HCatRecord output = new DefaultHCatRecord(2); + output.set("word", outputSchema, key); + output.set("count", outputSchema, sum); + context.write(null, output); + } + } + */ +} diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java index 5eaab950..8e0abda4 100644 --- a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java +++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java @@ -45,7 +45,7 @@ import java.util.Random; public class TestThrift { - private static final Log LOG = LogFactory.getLog(JdbcConnector.class.getName()); + private static final Log LOG = LogFactory.getLog(TestThrift.class.getName()); private static IMetaStoreClient client = null; private static HiveConf conf; |