aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjayunit100 <jayunit100@gmail.com>2015-02-16 20:51:39 -0500
committerjayunit100 <jay@apache.org>2015-02-17 10:45:55 -0500
commitb7e369edd6fa7b97ef4399c56ce66b766035f008 (patch)
tree8ea6296536f20fa6fd2dc202b65076e433a4495e
parent771ca5670a4f71bc88b5358f5c258289c49add23 (diff)
BIGTOP-1653. Add queries for customer, state, and product statistics w/ d3 friendly JSON output to analytics phase.
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/README.md42
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/build.gradle3
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala121
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala38
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala30
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala23
-rw-r--r--bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala43
7 files changed, 259 insertions, 41 deletions
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md
index 0ed1a055..caf4276e 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -97,3 +97,45 @@ After building the jar (see above), you can run the ETL component like so:
```
spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.etl.SparkETL bigpetstore-spark-X.jar generated\_data transformed\_data
```
+
+Running the SparkSQL component
+-------------------------------
+
+Once ETL'd we can now process the data and do analytics on it. The DataModel.scala class itself is used to read/write classes
+from files. To run the analytics job, which outputs a JSON file at the end, you now will run the following:
+
+```
+spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics bigpetstore-spark-X.jar transformed\_data PetStoreStats.json
+```
+
+This will output a JSON file to the /tmp directory, which has formatting (approximately) like this.
+
+```
+{
+ "totalTransaction":12,
+ "transactionsByZip":[
+ {"count":64,"productId":54,"zipcode":"94583"},{"count":38,"productId":18,"zipcode":"34761"},
+ {"count":158,"productId":14,"zipcode":"11368"},{"count":66,"productId":46,"zipcode":"33027"},
+ {"count":52,"productId":27,"zipcode":"94583"},{"count":84,"productId":19,"zipcode":"33027"},
+ {"count":143,"productId":0,"zipcode":"94583"},{"count":58,"productId":41,"zipcode":"72715"},
+ {"count":76,"productId":54,"zipcode":"15014"},{"count":118,"productId":52,"zipcode":"45439"}},
+ ..... (several more) ....
+ "productDetails":[
+ {
+ "productId":0,
+ "category":"kitty litter",
+ "attributes":{
+ "category":"kitty litter",
+ "brand":"Pretty Cat",
+ "size":"7.0",
+ "per_unit_cost":"1.43"
+ }
+ },
+ {
+ "productId":2,
+ "category":"dry cat food",
+ "attributes":{
+```
+
+Of course, the above data is for a front end web app which will display charts/summary stats of the transactions.
+Keep tracking Apache BigTop for updates on this front ! \ No newline at end of file
diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
index 35ff7bc4..6f3b2d04 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle
+++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
@@ -117,10 +117,13 @@ def updateDependencyVersion(dependencyDetails, dependencyString) {
dependencies {
compile "org.apache.spark:spark-assembly_2.10:${sparkVersion}"
compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2.1"
+ compile "joda-time:joda-time:2.7"
+ compile "org.json4s:json4s-jackson_2.10:3.1.0"
testCompile "junit:junit:4.11"
testCompile "org.hamcrest:hamcrest-all:1.3"
testCompile "org.scalatest:scalatest_2.10:2.2.1"
+ testCompile "joda-time:joda-time:2.7"
}
task listJars << {
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
index 03d20122..c97bf927 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
@@ -15,32 +15,34 @@
* limitations under the License.
*/
-package org.apache.bigtop.bigpetstore.spark.generator
+package org.apache.bigtop.bigpetstore.spark.analytics
+
+
+import java.sql.Timestamp
+
+import org.joda.time.DateTime
import _root_.org.apache.spark.SparkConf
-import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, Statistics,Transaction}
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+import org.apache.spark.sql._;
import scala.Nothing;
-import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord
-import com.github.rnowling.bps.datagenerator.datamodels._
-import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator}
-import com.github.rnowling.bps.datagenerator.framework.SeedFactory
import org.apache.hadoop.fs.Path
import scala.collection.JavaConversions._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
-import java.util.ArrayList
+import java.util.{Calendar, ArrayList, Date}
import scala.util.Random
import java.io.File
-import java.util.Date
+import org.json4s.JsonDSL.WithBigDecimal._
object PetStoreStatistics {
private def printUsage() {
val usage: String =
- "BigPetStore Analytics Module.\n" +
- "Usage: inputDir\n" ;
+ "BigPetStore Analytics Module. Usage: inputDir, outputDir.\n " +
+ "Ouptut is a JSON file in outputDir. For schema, see the code." ;
System.err.println(usage)
}
@@ -52,44 +54,95 @@ object PetStoreStatistics {
* @param args
* @return
*/
- def parseArgs(args: Array[String]):Option[Path] = {
- if(args.length != 1) {
- printUsage();
- return None;
- }
- //success, return path.
- Some(new Path(args(0)));
+ def parseArgs(args: Array[String]):(Option[Path],Option[File]) = {
+ (if(args.length < 1) { System.err.println("ERROR AT ARG 1: Missing INPUT path"); None } else Some(new Path(args(0))),
+ if(args.length < 2) { System.err.println("ERROR AT ARG 2: Missing OUTPUT path");; None } else Some(new File(args(1))))
}
- /**
- * Here we generate an RDD of all the petstore transactions,
- * by generating the static data first (stores, customers, ...)
- * followed by running the simulation as a distributed spark task.
- */
- def totalTransactions(r:(_,_,_,_,RDD[Transaction]), sc: SparkContext): Statistics = {
- return Statistics(r._5.count());
+ def productMap(r:Array[Product]) : Map[Long,Product] = {
+ r map (prod => prod.productId -> prod) toMap
+ }
+
+ def totalTransactions(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product], RDD[Transaction]),
+ sc: SparkContext): Statistics = {
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc);
+
+ import sqlContext._;
+
+ /**
+ * Transform the non-sparksql mappable calendar
+ * into a spark sql freindly field.
+ */
+ val mappableTransactions:RDD[TransactionSQL] =
+ /**
+ * Map the RDD[Transaction] -> RDD[TransactionSQL] so that we can run SparkSQL against it.
+ */
+ r._5.map(trans => trans.toSQL())
+
+ mappableTransactions.registerTempTable("transactions");
+
+ r._2.registerTempTable("Stores")
+
+ val results: SchemaRDD = sql("SELECT month,count(*) FROM transactions group by month")
+ val transactionsByMonth = results.collect();
+ for(x<-transactionsByMonth){
+ println(x);
+ }
+
+ val results2: SchemaRDD = sql(
+ """SELECT count(*) c, productId , zipcode
+FROM transactions t
+JOIN Stores s ON t.storeId = s.storeId
+GROUP BY productId, zipcode""")
+ val groupedProductZips = results2.collect();
+
+ //get list of all transactionsData
+ for(x<-groupedProductZips){
+ println("grouped product:zip " + x);
+ }
+
+ return Statistics(
+ results.count(), // Total number of transaction
+ results2.collect().map(r => {
+ //Map JDBC Row into a Serializable case class.
+ StatisticsTrByZip(r.getLong(0),r.getLong(1),r.getString(2))
+ }),
+ r._4.collect()); // Product details.
}
/**
* We keep a "run" method which can be called easily from tests and also is used by main.
*/
- def run(transactionsInputDir:String, sc:SparkContext): Boolean = {
+ def run(transactionsInputDir:String, sc:SparkContext): Statistics = {
System.out.println("input : " + transactionsInputDir);
- val t=totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
- System.out.println("Transaction count = " + t);
+ val stats = totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
sc.stop()
- true;
+ stats
}
- def main(args: Array[String]) {
+ def main(args: Array[String]) {
+ main(
+ args,
+ new SparkContext(new SparkConf().setAppName("PetStoreStatistics")));
+ }
+
+ def main(args: Array[String], context:SparkContext) = {
// Get or else : On failure (else) we exit.
- val inputPath = parseArgs(args).getOrElse {
- System.exit(1);
- };
+ val (inputPath,outputPath)= parseArgs(args);
+
+ if(! (inputPath.isDefined && outputPath.isDefined)) {
+ printUsage()
+ System.exit(1)
+ }
System.out.println("Running w/ input = " + inputPath);
- val conf = new SparkConf().setAppName("BPS Data Generator")
- run(inputPath.toString,new SparkContext(conf));
+
+ val stats:Statistics = run(inputPath.get.toUri.getPath, context);
+
+ IOUtils.saveLocalAsJSON(outputPath.get, stats)
+
+ System.out.println("Output JSON Stats stored : " + outputPath.get);
+
}
} \ No newline at end of file
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index 19a42602..8eaf707c 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -17,8 +17,23 @@
package org.apache.bigtop.bigpetstore.spark.datamodel
+import java.sql.Timestamp
import java.util.Calendar
+import org.apache.spark.sql
+import org.joda.time.DateTime
+import org.json4s.CustomSerializer
+import org.json4s.JsonAST.{JString, JField, JInt, JObject}
+
+/**
+ * Statistics phase. Represents A JSON for a front end.
+ * Currently, transactionByZip schema = count, productId, zip
+ * */
+
+case class StatisticsTrByZip(count:Long, productId:Long, zipcode:String)
+
+case class Statistics(totalTransaction: Long, transactionsByZip: Array[StatisticsTrByZip], productDetails:Array[Product])
+
case class Customer(customerId: Long, firstName: String,
lastName: String, zipcode: String)
@@ -28,9 +43,24 @@ case class Product(productId: Long, category: String, attributes: Map[String, St
case class Store(storeId: Long, zipcode: String)
-case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long)
+case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long){
+ /**
+ * Convert to TransactionSQL.
+ * There possibly could be a conversion.
+ */
+ def toSQL(): TransactionSQL = {
+ val dt = new DateTime(dateTime);
+ val ts = new Timestamp(dt.getMillis);
+ return TransactionSQL(customerId,transactionId,storeId,
+ new Timestamp(
+ new DateTime(dateTime).getMillis),
+ productId,
+ dt.getYearOfEra,dt.getMonthOfYear,dt.getDayOfMonth,dt.getHourOfDay,dt.getMinuteOfHour)
+ }
+}
/**
- * Statistics phase. To be expanded...
- * */
-case class Statistics(transactions:Long)
+ * A Transaction which we can create from the natively stored transactions.
+ */
+case class TransactionSQL(customerId: Long, transactionId: Long, storeId: Long, timestamp:Timestamp, productId: Long,
+ year:Int, month:Int, day:Int, hour:Int, minute:Int )
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
index 8899abd6..a4d1486d 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -17,13 +17,24 @@
package org.apache.bigtop.bigpetstore.spark.datamodel
+import java.io.File
import java.util.Date
+import java.nio.file.{Path, Paths, Files}
+import java.nio.charset.StandardCharsets
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.bigtop.bigpetstore.spark.datamodel._
+import org.json4s.JsonDSL._
+import org.json4s.JsonDSL.WithDouble._
+import org.json4s.JsonDSL.WithBigDecimal._
+import org.json4s.jackson.Serialization
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+import org.json4s.jackson.Serialization.{read, write}
/**
* Utility functions for loading and saving data model RDDs.
@@ -35,6 +46,8 @@ object IOUtils {
private val PRODUCT_DIR = "products"
private val TRANSACTION_DIR = "transactions"
+ private val ANALYTICS_STATS_DIR = "analytics_stats"
+
/**
* Save RDDs of the data model as Sequence files.
*
@@ -56,11 +69,28 @@ object IOUtils {
transactionRDD.saveAsObjectFile(outputDir + "/" + TRANSACTION_DIR)
}
+ def saveLocalAsJSON(outputDir: File, statistics: Statistics) {
+ //load the write/read methods.
+ implicit val formats = Serialization.formats(NoTypeHints)
+ val json:String = write(statistics)
+ Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8))
+ }
+
+ def readLocalAsStatistics(jsonFile: File):Statistics = {
+ //load the write/read methods.
+ implicit val formats = Serialization.formats(NoTypeHints)
+ //Read file as String, and serialize it into Stats object.
+ //See http://json4s.org/ examples.
+ read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_));
+ }
+
/**
* Load RDDs of the data model from Sequence files.
*
* @param sc SparkContext
* @param inputDir Directory containing Sequence files
+ *
+ * TODO Should take path, not string, this makes input validation complex.
*/
def load(sc: SparkContext, inputDir: String): (RDD[Location], RDD[Store],
RDD[Customer], RDD[Product], RDD[Transaction]) = {
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
index 747a4775..e460e2e9 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -1,9 +1,10 @@
package org.apache.bigpetstore.spark
+import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics
+import org.apache.bigtop.bigpetstore.spark.datamodel.{Statistics, IOUtils}
import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters
import org.apache.bigtop.bigpetstore.spark.etl.SparkETL
import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL}
-import org.apache.bigtop.bigpetstore.spark.generator.PetStoreStatistics
import org.apache.bigtop.bigpetstore.spark.generator.SparkDriver
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
@@ -55,9 +56,25 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll {
assert(customers==1000L)
assert(products==55L)
//assert(transactions==45349L)
-
+ val analyticsJson = new File(tmpDir,"analytics.json")
//Now do the analytics.
- PetStoreStatistics.run(etlDir.getAbsolutePath, sc);
+
+ PetStoreStatistics.main(
+ Array(
+ etlDir.getAbsolutePath,
+ analyticsJson.getAbsolutePath),
+ sc);
+
+ val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson);
+
+ /**
+ * Assert some very generic features. We will refine this later once
+ * consistency is implemented.
+ * See https://github.com/rnowling/bigpetstore-data-generator/issues/38
+ */
+ assert(stats.totalTransaction > 5);
+ //TODO : Will add more assertions here, see comment above
+ assert(stats.productDetails.length > 10);
sc.stop()
}
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
new file mode 100644
index 00000000..b7724ccf
--- /dev/null
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
@@ -0,0 +1,43 @@
+package org.apache.bigpetstore.spark.analytics
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.common.collect.ImmutableMap
+import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics
+import org.apache.bigtop.bigpetstore.spark.datamodel.Product
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+import Array._
+
+import java.io.File
+import java.nio.file.Files
+import java.util.Calendar
+import java.util.Locale
+
+
+// hack for running tests with Gradle
+@RunWith(classOf[JUnitRunner])
+class AnalyticsSuite extends FunSuite with BeforeAndAfterAll {
+
+ test("product mapper") {
+ val p = Product(1L, "cat1", Map(("a","a1"), ("b","b1")))
+ assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p);
+ }
+}