1. 例子
这个例子演示的功能见下面注释,主要是对交易流水的处理,包括过滤出正确的流水,做统计计算:
//1. The transaction records are coming as comma-separated values.
scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, SB10010,7000, CR10001,7000, SB10002,-10)
//The value acTransRDD is the RDD created out of the array where sc is the Spark context or the Spark driver and the RDD is created in a parallelized way so that the RDD elements can form a distributed dataset. In other words, an instruction is given to the Spark driver to form a parallel collection or RDD from the given collection of values.
scala> val acTransRDD = sc.parallelize(acTransList)
acTransRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
//2. Filter out only the good transaction records from the list. The account number should start with SB and the transaction amount should be greater than zero.
scala> val goodTransRecords = acTransRDD.filter(_.split(",")(1).toDouble > 0).filter(_.split(",")(0).startsWith("SB"))
goodTransRecords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25
//3. Find all the high value transaction records with a transaction amount greater than 1000.
scala> val highValueTransRecords = goodTransRecords.filter(_.split(",")(1).toDouble > 1000)
highValueTransRecords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:25
//5. Find all the transaction records where the transaction amount is less than or equal to zero.
scala> val badAmountLambda = (trans: String) => trans.split(",")(1).toDouble <= 0
badAmountLambda: String => Boolean = <function1>
//4. Find all the transaction records where the account number is bad.
scala> val badAcNoLambda = (trans: String) => trans.split(",")(0).startsWith("SB") == false
badAcNoLambda: String => Boolean = <function1>
scala> val badAmountRecords = acTransRDD.filter(badAmountLambda)
badAmountRecords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:27
scala> val badAccountRecords = acTransRDD.filter(badAcNoLambda)
badAccountRecords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:27
//6. Find a combined list of all the bad transaction records.
scala> val badTransRecords = badAmountRecords.union(badAccountRecords)
badTransRecords: org.apache.spark.rdd.RDD[String] = UnionRDD[6] at union at <console>:27
scala>
image.png
现在看看计算结果(即开始Spark action): image.pngThe Spark web UI for this application so far will not show anything at this point because only Spark transformations have been executed. The real activity will start only after the first Spark action is executed.
也就是说,上面那些是Spark transformations,Spark action还没有开始
继续增加功能:
//7. Find the total of all the transaction amounts.
scala> val sumAmount = goodTransRecords.map(trans => trans.split(",")(1).toDouble).reduce(_ + _)
sumAmount: Double = 28486.0
//8. Find the maximum of all the transaction amounts.
scala> val maxAmount = goodTransRecords.map(trans => trans.split(",")(1).toDouble).reduce((a, b) => if (a > b) a else b)
maxAmount: Double = 10000.0
//9. Find the minimum of all the transaction amounts.
scala> val minAmount = goodTransRecords.map(trans => trans.split(",")(1).toDouble).reduce((a, b) => if (a < b) a else b)
minAmount: Double = 30.0
//10. Find all the good account numbers
scala> val combineAllElements = acTransRDD.flatMap(trans => trans.split(","))
combineAllElements: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at <console>:25
scala> val allGoodAccountNos = combineAllElements.filter(_.startsWith("SB"))
allGoodAccountNos: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25
scala> combineAllElements.collect()
res6: Array[String] = Array(SB10001, 1000, SB10002, 1200, SB10003, 8000, SB10004, 400, SB10005, 300, SB10006, 10000, SB10007, 500, SB10008, 56, SB10009, 30, SB10010, 7000, CR10001, 7000, SB10002, -10)
scala> allGoodAccountNos.distinct().collect()
res7: Array[String] = Array(SB10006, SB10002, SB10010, SB10003, SB10007, SB10008, SB10004, SB10009, SB10001, SB10005)
scala>
image.png
网友评论