diff options
author | RJ Nowling <rnowling@gmail.com> | 2015-02-18 08:27:26 -0600 |
---|---|---|
committer | jayunit100 <jay@apache.org> | 2015-02-18 15:54:41 -0500 |
commit | 770d50b65a0585bc90251696910668377c6d24b6 (patch) | |
tree | 5be871b11a18ec39a74d1c9f5d5d447a2b1f41d0 | |
parent | b8ea3e0de1d29b132a634ff29b593d29c717aef8 (diff) |
[BigPetStore] Clean up Spark SQL analytics module. Adds transactions/month and transactions/product output.
7 files changed, 150 insertions, 104 deletions
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md index caf4276e..6ec74898 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/README.md +++ b/bigtop-bigpetstore/bigpetstore-spark/README.md @@ -108,11 +108,18 @@ from files. To run the analytics job, which outputs a JSON file at the end, you spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics bigpetstore-spark-X.jar transformed\_data PetStoreStats.json ``` +Current queries include: + +1. Total Transactions +2. Transaction Counts by Month +3. Transaction Counts by Product +4. Transaction Counts by Product and Store Zipcode + This will output a JSON file to the /tmp directory, which has formatting (approximately) like this. ``` { - "totalTransaction":12, + "totalTransaction":34586, "transactionsByZip":[ {"count":64,"productId":54,"zipcode":"94583"},{"count":38,"productId":18,"zipcode":"34761"}, {"count":158,"productId":14,"zipcode":"11368"},{"count":66,"productId":46,"zipcode":"33027"}, @@ -138,4 +145,4 @@ This will output a JSON file to the /tmp directory, which has formatting (approx ``` Of course, the above data is for a front end web app which will display charts/summary stats of the transactions. -Keep tracking Apache BigTop for updates on this front !
\ No newline at end of file +Keep tracking Apache BigTop for updates on this front ! diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala index c97bf927..2d376a40 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala @@ -17,32 +17,29 @@ package org.apache.bigtop.bigpetstore.spark.analytics - +import java.io.File import java.sql.Timestamp -import org.joda.time.DateTime +import scala.Nothing -import _root_.org.apache.spark.SparkConf -import org.apache.bigtop.bigpetstore.spark.datamodel._ -import org.apache.spark.sql._; -import scala.Nothing; -import org.apache.hadoop.fs.Path -import scala.collection.JavaConversions._ +import org.apache.spark.sql._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ -import java.util.{Calendar, ArrayList, Date} -import scala.util.Random -import java.io.File +import org.joda.time.DateTime import org.json4s.JsonDSL.WithBigDecimal._ +import org.apache.bigtop.bigpetstore.spark.datamodel._ + object PetStoreStatistics { private def printUsage() { - val usage: String = - "BigPetStore Analytics Module. Usage: inputDir, outputDir.\n " + - "Ouptut is a JSON file in outputDir. For schema, see the code." ; + val usage: String = "BigPetStore Analytics Module." + + "\n" + + "Usage: spark-submit ... inputDir outputFile\n " + + "inputDir - (string) Path to ETL'd data\n" + + "outputFile - (string) is a JSON file. For schema, see the code.\n" System.err.println(usage) } @@ -54,95 +51,131 @@ object PetStoreStatistics { * @param args * @return */ - def parseArgs(args: Array[String]):(Option[Path],Option[File]) = { - (if(args.length < 1) { System.err.println("ERROR AT ARG 1: Missing INPUT path"); None } else Some(new Path(args(0))), - if(args.length < 2) { System.err.println("ERROR AT ARG 2: Missing OUTPUT path");; None } else Some(new File(args(1)))) + def parseArgs(args: Array[String]):(Option[String],Option[String]) = { + if(args.length < 1) { + (None, None) + } else if (args.length == 1) { + (Some(args(0)), None) + } else { + (Some(args(0)), Some(args(1))) + } } def productMap(r:Array[Product]) : Map[Long,Product] = { r map (prod => prod.productId -> prod) toMap } - def totalTransactions(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product], RDD[Transaction]), - sc: SparkContext): Statistics = { - val sqlContext = new org.apache.spark.sql.SQLContext(sc); - - import sqlContext._; - - /** - * Transform the non-sparksql mappable calendar - * into a spark sql freindly field. - */ - val mappableTransactions:RDD[TransactionSQL] = - /** - * Map the RDD[Transaction] -> RDD[TransactionSQL] so that we can run SparkSQL against it. - */ - r._5.map(trans => trans.toSQL()) + def queryTxByMonth(sqlContext: SQLContext): Array[StatisticsTxByMonth] = { + import sqlContext._ - mappableTransactions.registerTempTable("transactions"); + val results: SchemaRDD = sql("SELECT count(*), month FROM Transactions GROUP BY month") + val transactionsByMonth = results.collect() + for(x<-transactionsByMonth){ + println(x) + } - r._2.registerTempTable("Stores") + transactionsByMonth.map { r => + StatisticsTxByMonth(r.getInt(1), r.getLong(0)) + } + } - val results: SchemaRDD = sql("SELECT month,count(*) FROM transactions group by month") - val transactionsByMonth = results.collect(); - for(x<-transactionsByMonth){ - println(x); - } + def queryTxByProductZip(sqlContext: SQLContext): Array[StatisticsTxByProductZip] = { + import sqlContext._ - val results2: SchemaRDD = sql( - """SELECT count(*) c, productId , zipcode -FROM transactions t + val results: SchemaRDD = sql( + """SELECT count(*) c, productId, zipcode +FROM Transactions t JOIN Stores s ON t.storeId = s.storeId GROUP BY productId, zipcode""") - val groupedProductZips = results2.collect(); - //get list of all transactionsData - for(x<-groupedProductZips){ - println("grouped product:zip " + x); - } + val groupedProductZips = results.collect() - return Statistics( - results.count(), // Total number of transaction - results2.collect().map(r => { - //Map JDBC Row into a Serializable case class. - StatisticsTrByZip(r.getLong(0),r.getLong(1),r.getString(2)) - }), - r._4.collect()); // Product details. + //get list of all transactionsData + for(x<-groupedProductZips){ + println("grouped product:zip " + x) } - /** - * We keep a "run" method which can be called easily from tests and also is used by main. - */ - def run(transactionsInputDir:String, sc:SparkContext): Statistics = { - System.out.println("input : " + transactionsInputDir); - val stats = totalTransactions(IOUtils.load(sc,transactionsInputDir), sc); - sc.stop() - stats + //Map JDBC Row into a Serializable case class. + groupedProductZips.map { r => + StatisticsTxByProductZip(r.getLong(1),r.getString(2),r.getLong(0)) } + } - def main(args: Array[String]) { - main( - args, - new SparkContext(new SparkConf().setAppName("PetStoreStatistics"))); + def queryTxByProduct(sqlContext: SQLContext): Array[StatisticsTxByProduct] = { + import sqlContext._ + + val results: SchemaRDD = sql( + """SELECT count(*) c, productId FROM Transactions GROUP BY productId""") + + val groupedProducts = results.collect() + + //Map JDBC Row into a Serializable case class. + groupedProducts.map { r => + StatisticsTxByProduct(r.getLong(1),r.getLong(0)) + } } - def main(args: Array[String], context:SparkContext) = { - // Get or else : On failure (else) we exit. - val (inputPath,outputPath)= parseArgs(args); - if(! (inputPath.isDefined && outputPath.isDefined)) { - printUsage() - System.exit(1) - } + def runQueries(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product], + RDD[Transaction]), sc: SparkContext): Statistics = { + + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + import sqlContext._ + + // Transform the Non-SparkSQL Calendar into a SparkSQL-friendly field. + val mappableTransactions:RDD[TransactionSQL] = + r._5.map { trans => trans.toSQL() } + + r._1.registerTempTable("Locations") + r._2.registerTempTable("Stores") + r._3.registerTempTable("Customers") + r._4.registerTempTable("Product") + mappableTransactions.registerTempTable("Transactions") + + + val txByMonth = queryTxByMonth(sqlContext) + val txByProduct = queryTxByProduct(sqlContext) + val txByProductZip = queryTxByProductZip(sqlContext) + + return Statistics( + txByMonth.map { s => s.count }.reduce(_+_), // Total number of transactions + txByMonth, + txByProduct, + txByProductZip, + r._4.collect()) // Product details + } - System.out.println("Running w/ input = " + inputPath); + /** + * We keep a "run" method which can be called easily from tests and also is used by main. + */ + def run(txInputDir:String, statsOutputFile:String, + sc:SparkContext) { - val stats:Statistics = run(inputPath.get.toUri.getPath, context); + System.out.println("Running w/ input = " + txInputDir) - IOUtils.saveLocalAsJSON(outputPath.get, stats) + System.out.println("input : " + txInputDir) + val etlData = IOUtils.load(sc, txInputDir) - System.out.println("Output JSON Stats stored : " + outputPath.get); + val stats = runQueries(etlData, sc) + IOUtils.saveLocalAsJSON(new File(statsOutputFile), stats) + + System.out.println("Output JSON Stats stored : " + statsOutputFile) } -}
\ No newline at end of file + def main(args: Array[String]) { + // Get or else : On failure (else) we exit. + val (inputPath,outputPath) = parseArgs(args) + + if(! (inputPath.isDefined && outputPath.isDefined)) { + printUsage() + System.exit(1) + } + + val sc = new SparkContext(new SparkConf().setAppName("PetStoreStatistics")) + + run(inputPath.get, outputPath.get, sc) + + sc.stop() + } +} diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala index 8eaf707c..59cc304c 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala @@ -26,13 +26,20 @@ import org.json4s.CustomSerializer import org.json4s.JsonAST.{JString, JField, JInt, JObject} /** - * Statistics phase. Represents A JSON for a front end. - * Currently, transactionByZip schema = count, productId, zip - * */ + * Statistics phase. Represents JSON for a front end. + */ + +case class StatisticsTxByMonth(month: Int, count: Long) -case class StatisticsTrByZip(count:Long, productId:Long, zipcode:String) +case class StatisticsTxByProductZip(productId:Long, zipcode:String, count:Long) -case class Statistics(totalTransaction: Long, transactionsByZip: Array[StatisticsTrByZip], productDetails:Array[Product]) +case class StatisticsTxByProduct(count: Long, productId: Long) + +case class Statistics(totalTransactions: Long, + transactionsByMonth: Array[StatisticsTxByMonth], + transactionsByProduct: Array[StatisticsTxByProduct], + transactionsByProductZip: Array[StatisticsTxByProductZip], + productDetails:Array[Product]) case class Customer(customerId: Long, firstName: String, lastName: String, zipcode: String) @@ -44,13 +51,14 @@ case class Product(productId: Long, category: String, attributes: Map[String, St case class Store(storeId: Long, zipcode: String) case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long){ + /** * Convert to TransactionSQL. * There possibly could be a conversion. */ def toSQL(): TransactionSQL = { - val dt = new DateTime(dateTime); - val ts = new Timestamp(dt.getMillis); + val dt = new DateTime(dateTime) + val ts = new Timestamp(dt.getMillis) return TransactionSQL(customerId,transactionId,storeId, new Timestamp( new DateTime(dateTime).getMillis), diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala index a4d1486d..5432e3c6 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala @@ -81,7 +81,7 @@ object IOUtils { implicit val formats = Serialization.formats(NoTypeHints) //Read file as String, and serialize it into Stats object. //See http://json4s.org/ examples. - read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_)); + read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_)) } /** diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala index e460e2e9..9d5cb843 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala @@ -30,7 +30,7 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll { val sc = new SparkContext(conf) override def afterAll() { - sc.stop(); + sc.stop() } test("Full integration test.") { @@ -40,7 +40,7 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll { //stores, customers, days, randomSeed val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", "365.0","123456789") - SparkDriver.parseArgs(parameters); + SparkDriver.parseArgs(parameters) val transactionRDD = SparkDriver.generateData(sc) SparkDriver.writeData(transactionRDD) @@ -49,33 +49,31 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll { val etlDir:File = Files.createTempDirectory("BPSTest_ETL2").toFile() System.out.println(etlDir.getAbsolutePath + "== "+etlDir.list()) - val (locations,stores,customers,products,transactions) = SparkETL.run(sc, new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath)); + val (locations,stores,customers,products,transactions) = SparkETL.run(sc, new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath)) // assert(locations==400L) TODO : This seems to vary (325,400,) assert(stores==10L) assert(customers==1000L) assert(products==55L) //assert(transactions==45349L) - val analyticsJson = new File(tmpDir,"analytics.json") + //Now do the analytics. + val analyticsJson = new File(tmpDir,"analytics.json") - PetStoreStatistics.main( - Array( - etlDir.getAbsolutePath, - analyticsJson.getAbsolutePath), - sc); + PetStoreStatistics.run(etlDir.getAbsolutePath, + analyticsJson.getAbsolutePath, sc) - val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson); + val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson) /** * Assert some very generic features. We will refine this later once * consistency is implemented. * See https://github.com/rnowling/bigpetstore-data-generator/issues/38 */ - assert(stats.totalTransaction > 5); - //TODO : Will add more assertions here, see comment above - assert(stats.productDetails.length > 10); + assert(stats.totalTransactions === transactions) + assert(stats.productDetails.length === products) + assert(stats.transactionsByMonth.length === 12) sc.stop() } -}
\ No newline at end of file +} diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala index b7724ccf..d9ed3907 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala @@ -38,6 +38,6 @@ class AnalyticsSuite extends FunSuite with BeforeAndAfterAll { test("product mapper") { val p = Product(1L, "cat1", Map(("a","a1"), ("b","b1"))) - assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p); + assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p) } } diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala index 7b1e1f5f..49ff1a4e 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala @@ -40,7 +40,7 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { val sc = new SparkContext(conf) override def afterAll() { - sc.stop(); + sc.stop() } test("Saving & Loading data") { |