aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRJ Nowling <rnowling@gmail.com>2015-02-18 08:27:26 -0600
committerjayunit100 <jay@apache.org>2015-02-18 15:54:41 -0500
commit770d50b65a0585bc90251696910668377c6d24b6 (patch)
tree5be871b11a18ec39a74d1c9f5d5d447a2b1f41d0
parentb8ea3e0de1d29b132a634ff29b593d29c717aef8 (diff)
[BigPetStore] Clean up Spark SQL analytics module. Adds transactions/month and transactions/product output.
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/README.md11
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala189
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala22
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala2
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala26
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala2
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala2
7 files changed, 150 insertions, 104 deletions
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md
index caf4276e..6ec74898 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -108,11 +108,18 @@ from files. To run the analytics job, which outputs a JSON file at the end, you
spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics bigpetstore-spark-X.jar transformed\_data PetStoreStats.json
```
+Current queries include:
+
+1. Total Transactions
+2. Transaction Counts by Month
+3. Transaction Counts by Product
+4. Transaction Counts by Product and Store Zipcode
+
This will output a JSON file to the /tmp directory, which has formatting (approximately) like this.
```
{
- "totalTransaction":12,
+ "totalTransaction":34586,
"transactionsByZip":[
{"count":64,"productId":54,"zipcode":"94583"},{"count":38,"productId":18,"zipcode":"34761"},
{"count":158,"productId":14,"zipcode":"11368"},{"count":66,"productId":46,"zipcode":"33027"},
@@ -138,4 +145,4 @@ This will output a JSON file to the /tmp directory, which has formatting (approx
```
Of course, the above data is for a front end web app which will display charts/summary stats of the transactions.
-Keep tracking Apache BigTop for updates on this front ! \ No newline at end of file
+Keep tracking Apache BigTop for updates on this front !
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
index c97bf927..2d376a40 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
@@ -17,32 +17,29 @@
package org.apache.bigtop.bigpetstore.spark.analytics
-
+import java.io.File
import java.sql.Timestamp
-import org.joda.time.DateTime
+import scala.Nothing
-import _root_.org.apache.spark.SparkConf
-import org.apache.bigtop.bigpetstore.spark.datamodel._
-import org.apache.spark.sql._;
-import scala.Nothing;
-import org.apache.hadoop.fs.Path
-import scala.collection.JavaConversions._
+import org.apache.spark.sql._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
-import java.util.{Calendar, ArrayList, Date}
-import scala.util.Random
-import java.io.File
+import org.joda.time.DateTime
import org.json4s.JsonDSL.WithBigDecimal._
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+
object PetStoreStatistics {
private def printUsage() {
- val usage: String =
- "BigPetStore Analytics Module. Usage: inputDir, outputDir.\n " +
- "Ouptut is a JSON file in outputDir. For schema, see the code." ;
+ val usage: String = "BigPetStore Analytics Module." +
+ "\n" +
+ "Usage: spark-submit ... inputDir outputFile\n " +
+ "inputDir - (string) Path to ETL'd data\n" +
+ "outputFile - (string) is a JSON file. For schema, see the code.\n"
System.err.println(usage)
}
@@ -54,95 +51,131 @@ object PetStoreStatistics {
* @param args
* @return
*/
- def parseArgs(args: Array[String]):(Option[Path],Option[File]) = {
- (if(args.length < 1) { System.err.println("ERROR AT ARG 1: Missing INPUT path"); None } else Some(new Path(args(0))),
- if(args.length < 2) { System.err.println("ERROR AT ARG 2: Missing OUTPUT path");; None } else Some(new File(args(1))))
+ def parseArgs(args: Array[String]):(Option[String],Option[String]) = {
+ if(args.length < 1) {
+ (None, None)
+ } else if (args.length == 1) {
+ (Some(args(0)), None)
+ } else {
+ (Some(args(0)), Some(args(1)))
+ }
}
def productMap(r:Array[Product]) : Map[Long,Product] = {
r map (prod => prod.productId -> prod) toMap
}
- def totalTransactions(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product], RDD[Transaction]),
- sc: SparkContext): Statistics = {
- val sqlContext = new org.apache.spark.sql.SQLContext(sc);
-
- import sqlContext._;
-
- /**
- * Transform the non-sparksql mappable calendar
- * into a spark sql freindly field.
- */
- val mappableTransactions:RDD[TransactionSQL] =
- /**
- * Map the RDD[Transaction] -> RDD[TransactionSQL] so that we can run SparkSQL against it.
- */
- r._5.map(trans => trans.toSQL())
+ def queryTxByMonth(sqlContext: SQLContext): Array[StatisticsTxByMonth] = {
+ import sqlContext._
- mappableTransactions.registerTempTable("transactions");
+ val results: SchemaRDD = sql("SELECT count(*), month FROM Transactions GROUP BY month")
+ val transactionsByMonth = results.collect()
+ for(x<-transactionsByMonth){
+ println(x)
+ }
- r._2.registerTempTable("Stores")
+ transactionsByMonth.map { r =>
+ StatisticsTxByMonth(r.getInt(1), r.getLong(0))
+ }
+ }
- val results: SchemaRDD = sql("SELECT month,count(*) FROM transactions group by month")
- val transactionsByMonth = results.collect();
- for(x<-transactionsByMonth){
- println(x);
- }
+ def queryTxByProductZip(sqlContext: SQLContext): Array[StatisticsTxByProductZip] = {
+ import sqlContext._
- val results2: SchemaRDD = sql(
- """SELECT count(*) c, productId , zipcode
-FROM transactions t
+ val results: SchemaRDD = sql(
+ """SELECT count(*) c, productId, zipcode
+FROM Transactions t
JOIN Stores s ON t.storeId = s.storeId
GROUP BY productId, zipcode""")
- val groupedProductZips = results2.collect();
- //get list of all transactionsData
- for(x<-groupedProductZips){
- println("grouped product:zip " + x);
- }
+ val groupedProductZips = results.collect()
- return Statistics(
- results.count(), // Total number of transaction
- results2.collect().map(r => {
- //Map JDBC Row into a Serializable case class.
- StatisticsTrByZip(r.getLong(0),r.getLong(1),r.getString(2))
- }),
- r._4.collect()); // Product details.
+ //get list of all transactionsData
+ for(x<-groupedProductZips){
+ println("grouped product:zip " + x)
}
- /**
- * We keep a "run" method which can be called easily from tests and also is used by main.
- */
- def run(transactionsInputDir:String, sc:SparkContext): Statistics = {
- System.out.println("input : " + transactionsInputDir);
- val stats = totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
- sc.stop()
- stats
+ //Map JDBC Row into a Serializable case class.
+ groupedProductZips.map { r =>
+ StatisticsTxByProductZip(r.getLong(1),r.getString(2),r.getLong(0))
}
+ }
- def main(args: Array[String]) {
- main(
- args,
- new SparkContext(new SparkConf().setAppName("PetStoreStatistics")));
+ def queryTxByProduct(sqlContext: SQLContext): Array[StatisticsTxByProduct] = {
+ import sqlContext._
+
+ val results: SchemaRDD = sql(
+ """SELECT count(*) c, productId FROM Transactions GROUP BY productId""")
+
+ val groupedProducts = results.collect()
+
+ //Map JDBC Row into a Serializable case class.
+ groupedProducts.map { r =>
+ StatisticsTxByProduct(r.getLong(1),r.getLong(0))
+ }
}
- def main(args: Array[String], context:SparkContext) = {
- // Get or else : On failure (else) we exit.
- val (inputPath,outputPath)= parseArgs(args);
- if(! (inputPath.isDefined && outputPath.isDefined)) {
- printUsage()
- System.exit(1)
- }
+ def runQueries(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product],
+ RDD[Transaction]), sc: SparkContext): Statistics = {
+
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ import sqlContext._
+
+ // Transform the Non-SparkSQL Calendar into a SparkSQL-friendly field.
+ val mappableTransactions:RDD[TransactionSQL] =
+ r._5.map { trans => trans.toSQL() }
+
+ r._1.registerTempTable("Locations")
+ r._2.registerTempTable("Stores")
+ r._3.registerTempTable("Customers")
+ r._4.registerTempTable("Product")
+ mappableTransactions.registerTempTable("Transactions")
+
+
+ val txByMonth = queryTxByMonth(sqlContext)
+ val txByProduct = queryTxByProduct(sqlContext)
+ val txByProductZip = queryTxByProductZip(sqlContext)
+
+ return Statistics(
+ txByMonth.map { s => s.count }.reduce(_+_), // Total number of transactions
+ txByMonth,
+ txByProduct,
+ txByProductZip,
+ r._4.collect()) // Product details
+ }
- System.out.println("Running w/ input = " + inputPath);
+ /**
+ * We keep a "run" method which can be called easily from tests and also is used by main.
+ */
+ def run(txInputDir:String, statsOutputFile:String,
+ sc:SparkContext) {
- val stats:Statistics = run(inputPath.get.toUri.getPath, context);
+ System.out.println("Running w/ input = " + txInputDir)
- IOUtils.saveLocalAsJSON(outputPath.get, stats)
+ System.out.println("input : " + txInputDir)
+ val etlData = IOUtils.load(sc, txInputDir)
- System.out.println("Output JSON Stats stored : " + outputPath.get);
+ val stats = runQueries(etlData, sc)
+ IOUtils.saveLocalAsJSON(new File(statsOutputFile), stats)
+
+ System.out.println("Output JSON Stats stored : " + statsOutputFile)
}
-} \ No newline at end of file
+ def main(args: Array[String]) {
+ // Get or else : On failure (else) we exit.
+ val (inputPath,outputPath) = parseArgs(args)
+
+ if(! (inputPath.isDefined && outputPath.isDefined)) {
+ printUsage()
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(new SparkConf().setAppName("PetStoreStatistics"))
+
+ run(inputPath.get, outputPath.get, sc)
+
+ sc.stop()
+ }
+}
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index 8eaf707c..59cc304c 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -26,13 +26,20 @@ import org.json4s.CustomSerializer
import org.json4s.JsonAST.{JString, JField, JInt, JObject}
/**
- * Statistics phase. Represents A JSON for a front end.
- * Currently, transactionByZip schema = count, productId, zip
- * */
+ * Statistics phase. Represents JSON for a front end.
+ */
+
+case class StatisticsTxByMonth(month: Int, count: Long)
-case class StatisticsTrByZip(count:Long, productId:Long, zipcode:String)
+case class StatisticsTxByProductZip(productId:Long, zipcode:String, count:Long)
-case class Statistics(totalTransaction: Long, transactionsByZip: Array[StatisticsTrByZip], productDetails:Array[Product])
+case class StatisticsTxByProduct(count: Long, productId: Long)
+
+case class Statistics(totalTransactions: Long,
+ transactionsByMonth: Array[StatisticsTxByMonth],
+ transactionsByProduct: Array[StatisticsTxByProduct],
+ transactionsByProductZip: Array[StatisticsTxByProductZip],
+ productDetails:Array[Product])
case class Customer(customerId: Long, firstName: String,
lastName: String, zipcode: String)
@@ -44,13 +51,14 @@ case class Product(productId: Long, category: String, attributes: Map[String, St
case class Store(storeId: Long, zipcode: String)
case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long){
+
/**
* Convert to TransactionSQL.
* There possibly could be a conversion.
*/
def toSQL(): TransactionSQL = {
- val dt = new DateTime(dateTime);
- val ts = new Timestamp(dt.getMillis);
+ val dt = new DateTime(dateTime)
+ val ts = new Timestamp(dt.getMillis)
return TransactionSQL(customerId,transactionId,storeId,
new Timestamp(
new DateTime(dateTime).getMillis),
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
index a4d1486d..5432e3c6 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -81,7 +81,7 @@ object IOUtils {
implicit val formats = Serialization.formats(NoTypeHints)
//Read file as String, and serialize it into Stats object.
//See http://json4s.org/ examples.
- read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_));
+ read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_))
}
/**
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
index e460e2e9..9d5cb843 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -30,7 +30,7 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext(conf)
override def afterAll() {
- sc.stop();
+ sc.stop()
}
test("Full integration test.") {
@@ -40,7 +40,7 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll {
//stores, customers, days, randomSeed
val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", "365.0","123456789")
- SparkDriver.parseArgs(parameters);
+ SparkDriver.parseArgs(parameters)
val transactionRDD = SparkDriver.generateData(sc)
SparkDriver.writeData(transactionRDD)
@@ -49,33 +49,31 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll {
val etlDir:File = Files.createTempDirectory("BPSTest_ETL2").toFile()
System.out.println(etlDir.getAbsolutePath + "== "+etlDir.list())
- val (locations,stores,customers,products,transactions) = SparkETL.run(sc, new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath));
+ val (locations,stores,customers,products,transactions) = SparkETL.run(sc, new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath))
// assert(locations==400L) TODO : This seems to vary (325,400,)
assert(stores==10L)
assert(customers==1000L)
assert(products==55L)
//assert(transactions==45349L)
- val analyticsJson = new File(tmpDir,"analytics.json")
+
//Now do the analytics.
+ val analyticsJson = new File(tmpDir,"analytics.json")
- PetStoreStatistics.main(
- Array(
- etlDir.getAbsolutePath,
- analyticsJson.getAbsolutePath),
- sc);
+ PetStoreStatistics.run(etlDir.getAbsolutePath,
+ analyticsJson.getAbsolutePath, sc)
- val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson);
+ val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson)
/**
* Assert some very generic features. We will refine this later once
* consistency is implemented.
* See https://github.com/rnowling/bigpetstore-data-generator/issues/38
*/
- assert(stats.totalTransaction > 5);
- //TODO : Will add more assertions here, see comment above
- assert(stats.productDetails.length > 10);
+ assert(stats.totalTransactions === transactions)
+ assert(stats.productDetails.length === products)
+ assert(stats.transactionsByMonth.length === 12)
sc.stop()
}
-} \ No newline at end of file
+}
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
index b7724ccf..d9ed3907 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
@@ -38,6 +38,6 @@ class AnalyticsSuite extends FunSuite with BeforeAndAfterAll {
test("product mapper") {
val p = Product(1L, "cat1", Map(("a","a1"), ("b","b1")))
- assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p);
+ assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p)
}
}
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
index 7b1e1f5f..49ff1a4e 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
@@ -40,7 +40,7 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext(conf)
override def afterAll() {
- sc.stop();
+ sc.stop()
}
test("Saving & Loading data") {