diff options
author | jayunit100 <jayunit100@gmail.com> | 2015-02-16 20:51:39 -0500 |
---|---|---|
committer | jayunit100 <jay@apache.org> | 2015-02-17 10:45:55 -0500 |
commit | b7e369edd6fa7b97ef4399c56ce66b766035f008 (patch) | |
tree | 8ea6296536f20fa6fd2dc202b65076e433a4495e | |
parent | 771ca5670a4f71bc88b5358f5c258289c49add23 (diff) |
BIGTOP-1653. Add queries for customer, state, and product statistics w/ d3 friendly JSON output to analytics phase.
7 files changed, 259 insertions, 41 deletions
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md index 0ed1a055..caf4276e 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/README.md +++ b/bigtop-bigpetstore/bigpetstore-spark/README.md @@ -97,3 +97,45 @@ After building the jar (see above), you can run the ETL component like so: ``` spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.etl.SparkETL bigpetstore-spark-X.jar generated\_data transformed\_data ``` + +Running the SparkSQL component +------------------------------- + +Once ETL'd we can now process the data and do analytics on it. The DataModel.scala class itself is used to read/write classes +from files. To run the analytics job, which outputs a JSON file at the end, you now will run the following: + +``` +spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics bigpetstore-spark-X.jar transformed\_data PetStoreStats.json +``` + +This will output a JSON file to the /tmp directory, which has formatting (approximately) like this. + +``` +{ + "totalTransaction":12, + "transactionsByZip":[ + {"count":64,"productId":54,"zipcode":"94583"},{"count":38,"productId":18,"zipcode":"34761"}, + {"count":158,"productId":14,"zipcode":"11368"},{"count":66,"productId":46,"zipcode":"33027"}, + {"count":52,"productId":27,"zipcode":"94583"},{"count":84,"productId":19,"zipcode":"33027"}, + {"count":143,"productId":0,"zipcode":"94583"},{"count":58,"productId":41,"zipcode":"72715"}, + {"count":76,"productId":54,"zipcode":"15014"},{"count":118,"productId":52,"zipcode":"45439"}}, + ..... (several more) .... + "productDetails":[ + { + "productId":0, + "category":"kitty litter", + "attributes":{ + "category":"kitty litter", + "brand":"Pretty Cat", + "size":"7.0", + "per_unit_cost":"1.43" + } + }, + { + "productId":2, + "category":"dry cat food", + "attributes":{ +``` + +Of course, the above data is for a front end web app which will display charts/summary stats of the transactions. +Keep tracking Apache BigTop for updates on this front !
\ No newline at end of file diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle b/bigtop-bigpetstore/bigpetstore-spark/build.gradle index 35ff7bc4..6f3b2d04 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle +++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle @@ -117,10 +117,13 @@ def updateDependencyVersion(dependencyDetails, dependencyString) { dependencies { compile "org.apache.spark:spark-assembly_2.10:${sparkVersion}" compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2.1" + compile "joda-time:joda-time:2.7" + compile "org.json4s:json4s-jackson_2.10:3.1.0" testCompile "junit:junit:4.11" testCompile "org.hamcrest:hamcrest-all:1.3" testCompile "org.scalatest:scalatest_2.10:2.2.1" + testCompile "joda-time:joda-time:2.7" } task listJars << { diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala index 03d20122..c97bf927 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala @@ -15,32 +15,34 @@ * limitations under the License. */ -package org.apache.bigtop.bigpetstore.spark.generator +package org.apache.bigtop.bigpetstore.spark.analytics + + +import java.sql.Timestamp + +import org.joda.time.DateTime import _root_.org.apache.spark.SparkConf -import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, Statistics,Transaction} +import org.apache.bigtop.bigpetstore.spark.datamodel._ +import org.apache.spark.sql._; import scala.Nothing; -import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord -import com.github.rnowling.bps.datagenerator.datamodels._ -import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator} -import com.github.rnowling.bps.datagenerator.framework.SeedFactory import org.apache.hadoop.fs.Path import scala.collection.JavaConversions._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ -import java.util.ArrayList +import java.util.{Calendar, ArrayList, Date} import scala.util.Random import java.io.File -import java.util.Date +import org.json4s.JsonDSL.WithBigDecimal._ object PetStoreStatistics { private def printUsage() { val usage: String = - "BigPetStore Analytics Module.\n" + - "Usage: inputDir\n" ; + "BigPetStore Analytics Module. Usage: inputDir, outputDir.\n " + + "Ouptut is a JSON file in outputDir. For schema, see the code." ; System.err.println(usage) } @@ -52,44 +54,95 @@ object PetStoreStatistics { * @param args * @return */ - def parseArgs(args: Array[String]):Option[Path] = { - if(args.length != 1) { - printUsage(); - return None; - } - //success, return path. - Some(new Path(args(0))); + def parseArgs(args: Array[String]):(Option[Path],Option[File]) = { + (if(args.length < 1) { System.err.println("ERROR AT ARG 1: Missing INPUT path"); None } else Some(new Path(args(0))), + if(args.length < 2) { System.err.println("ERROR AT ARG 2: Missing OUTPUT path");; None } else Some(new File(args(1)))) } - /** - * Here we generate an RDD of all the petstore transactions, - * by generating the static data first (stores, customers, ...) - * followed by running the simulation as a distributed spark task. - */ - def totalTransactions(r:(_,_,_,_,RDD[Transaction]), sc: SparkContext): Statistics = { - return Statistics(r._5.count()); + def productMap(r:Array[Product]) : Map[Long,Product] = { + r map (prod => prod.productId -> prod) toMap + } + + def totalTransactions(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product], RDD[Transaction]), + sc: SparkContext): Statistics = { + val sqlContext = new org.apache.spark.sql.SQLContext(sc); + + import sqlContext._; + + /** + * Transform the non-sparksql mappable calendar + * into a spark sql freindly field. + */ + val mappableTransactions:RDD[TransactionSQL] = + /** + * Map the RDD[Transaction] -> RDD[TransactionSQL] so that we can run SparkSQL against it. + */ + r._5.map(trans => trans.toSQL()) + + mappableTransactions.registerTempTable("transactions"); + + r._2.registerTempTable("Stores") + + val results: SchemaRDD = sql("SELECT month,count(*) FROM transactions group by month") + val transactionsByMonth = results.collect(); + for(x<-transactionsByMonth){ + println(x); + } + + val results2: SchemaRDD = sql( + """SELECT count(*) c, productId , zipcode +FROM transactions t +JOIN Stores s ON t.storeId = s.storeId +GROUP BY productId, zipcode""") + val groupedProductZips = results2.collect(); + + //get list of all transactionsData + for(x<-groupedProductZips){ + println("grouped product:zip " + x); + } + + return Statistics( + results.count(), // Total number of transaction + results2.collect().map(r => { + //Map JDBC Row into a Serializable case class. + StatisticsTrByZip(r.getLong(0),r.getLong(1),r.getString(2)) + }), + r._4.collect()); // Product details. } /** * We keep a "run" method which can be called easily from tests and also is used by main. */ - def run(transactionsInputDir:String, sc:SparkContext): Boolean = { + def run(transactionsInputDir:String, sc:SparkContext): Statistics = { System.out.println("input : " + transactionsInputDir); - val t=totalTransactions(IOUtils.load(sc,transactionsInputDir), sc); - System.out.println("Transaction count = " + t); + val stats = totalTransactions(IOUtils.load(sc,transactionsInputDir), sc); sc.stop() - true; + stats } - def main(args: Array[String]) { + def main(args: Array[String]) { + main( + args, + new SparkContext(new SparkConf().setAppName("PetStoreStatistics"))); + } + + def main(args: Array[String], context:SparkContext) = { // Get or else : On failure (else) we exit. - val inputPath = parseArgs(args).getOrElse { - System.exit(1); - }; + val (inputPath,outputPath)= parseArgs(args); + + if(! (inputPath.isDefined && outputPath.isDefined)) { + printUsage() + System.exit(1) + } System.out.println("Running w/ input = " + inputPath); - val conf = new SparkConf().setAppName("BPS Data Generator") - run(inputPath.toString,new SparkContext(conf)); + + val stats:Statistics = run(inputPath.get.toUri.getPath, context); + + IOUtils.saveLocalAsJSON(outputPath.get, stats) + + System.out.println("Output JSON Stats stored : " + outputPath.get); + } }
\ No newline at end of file diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala index 19a42602..8eaf707c 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala @@ -17,8 +17,23 @@ package org.apache.bigtop.bigpetstore.spark.datamodel +import java.sql.Timestamp import java.util.Calendar +import org.apache.spark.sql +import org.joda.time.DateTime +import org.json4s.CustomSerializer +import org.json4s.JsonAST.{JString, JField, JInt, JObject} + +/** + * Statistics phase. Represents A JSON for a front end. + * Currently, transactionByZip schema = count, productId, zip + * */ + +case class StatisticsTrByZip(count:Long, productId:Long, zipcode:String) + +case class Statistics(totalTransaction: Long, transactionsByZip: Array[StatisticsTrByZip], productDetails:Array[Product]) + case class Customer(customerId: Long, firstName: String, lastName: String, zipcode: String) @@ -28,9 +43,24 @@ case class Product(productId: Long, category: String, attributes: Map[String, St case class Store(storeId: Long, zipcode: String) -case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long) +case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long){ + /** + * Convert to TransactionSQL. + * There possibly could be a conversion. + */ + def toSQL(): TransactionSQL = { + val dt = new DateTime(dateTime); + val ts = new Timestamp(dt.getMillis); + return TransactionSQL(customerId,transactionId,storeId, + new Timestamp( + new DateTime(dateTime).getMillis), + productId, + dt.getYearOfEra,dt.getMonthOfYear,dt.getDayOfMonth,dt.getHourOfDay,dt.getMinuteOfHour) + } +} /** - * Statistics phase. To be expanded... - * */ -case class Statistics(transactions:Long) + * A Transaction which we can create from the natively stored transactions. + */ +case class TransactionSQL(customerId: Long, transactionId: Long, storeId: Long, timestamp:Timestamp, productId: Long, + year:Int, month:Int, day:Int, hour:Int, minute:Int ) diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala index 8899abd6..a4d1486d 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala @@ -17,13 +17,24 @@ package org.apache.bigtop.bigpetstore.spark.datamodel +import java.io.File import java.util.Date +import java.nio.file.{Path, Paths, Files} +import java.nio.charset.StandardCharsets import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.bigtop.bigpetstore.spark.datamodel._ +import org.json4s.JsonDSL._ +import org.json4s.JsonDSL.WithDouble._ +import org.json4s.JsonDSL.WithBigDecimal._ +import org.json4s.jackson.Serialization +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.json4s.jackson.Serialization.{read, write} /** * Utility functions for loading and saving data model RDDs. @@ -35,6 +46,8 @@ object IOUtils { private val PRODUCT_DIR = "products" private val TRANSACTION_DIR = "transactions" + private val ANALYTICS_STATS_DIR = "analytics_stats" + /** * Save RDDs of the data model as Sequence files. * @@ -56,11 +69,28 @@ object IOUtils { transactionRDD.saveAsObjectFile(outputDir + "/" + TRANSACTION_DIR) } + def saveLocalAsJSON(outputDir: File, statistics: Statistics) { + //load the write/read methods. + implicit val formats = Serialization.formats(NoTypeHints) + val json:String = write(statistics) + Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8)) + } + + def readLocalAsStatistics(jsonFile: File):Statistics = { + //load the write/read methods. + implicit val formats = Serialization.formats(NoTypeHints) + //Read file as String, and serialize it into Stats object. + //See http://json4s.org/ examples. + read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_)); + } + /** * Load RDDs of the data model from Sequence files. * * @param sc SparkContext * @param inputDir Directory containing Sequence files + * + * TODO Should take path, not string, this makes input validation complex. */ def load(sc: SparkContext, inputDir: String): (RDD[Location], RDD[Store], RDD[Customer], RDD[Product], RDD[Transaction]) = { diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala index 747a4775..e460e2e9 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala @@ -1,9 +1,10 @@ package org.apache.bigpetstore.spark +import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics +import org.apache.bigtop.bigpetstore.spark.datamodel.{Statistics, IOUtils} import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters import org.apache.bigtop.bigpetstore.spark.etl.SparkETL import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL} -import org.apache.bigtop.bigpetstore.spark.generator.PetStoreStatistics import org.apache.bigtop.bigpetstore.spark.generator.SparkDriver import org.apache.spark.SparkConf import org.apache.spark.SparkContext @@ -55,9 +56,25 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll { assert(customers==1000L) assert(products==55L) //assert(transactions==45349L) - + val analyticsJson = new File(tmpDir,"analytics.json") //Now do the analytics. - PetStoreStatistics.run(etlDir.getAbsolutePath, sc); + + PetStoreStatistics.main( + Array( + etlDir.getAbsolutePath, + analyticsJson.getAbsolutePath), + sc); + + val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson); + + /** + * Assert some very generic features. We will refine this later once + * consistency is implemented. + * See https://github.com/rnowling/bigpetstore-data-generator/issues/38 + */ + assert(stats.totalTransaction > 5); + //TODO : Will add more assertions here, see comment above + assert(stats.productDetails.length > 10); sc.stop() } diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala new file mode 100644 index 00000000..b7724ccf --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala @@ -0,0 +1,43 @@ +package org.apache.bigpetstore.spark.analytics + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.collect.ImmutableMap +import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics +import org.apache.bigtop.bigpetstore.spark.datamodel.Product +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.junit.JUnitRunner + +import Array._ + +import java.io.File +import java.nio.file.Files +import java.util.Calendar +import java.util.Locale + + +// hack for running tests with Gradle +@RunWith(classOf[JUnitRunner]) +class AnalyticsSuite extends FunSuite with BeforeAndAfterAll { + + test("product mapper") { + val p = Product(1L, "cat1", Map(("a","a1"), ("b","b1"))) + assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p); + } +} |