这个和005 是一个usecase,但是采用完全不同的编程模型
1. SQL queries
yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/spark-shell
19/12/01 17:42:52 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
19/12/01 17:42:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/12/01 17:42:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1575193384359).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> // Define the case classes for using in conjunction with DataFrames
scala> case class Trans(accNo: String, tranAmount: Double)
defined class Trans
scala> // Functions to convert the sequence of strings to objects defined by the case classes
scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
toTrans: Seq[String] => Trans
scala> // Creation of the list from where the RDD is going to be created
scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, SB10010,7000, CR10001,7000, SB10002,-10)
scala> val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_))
acTransRDD: org.apache.spark.rdd.RDD[Trans] = MapPartitionsRDD[2] at map at <console>:28
scala> // Convert RDD to DataFrame
scala> val acTransDF = spark.createDataFrame(acTransRDD)
acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Register temporary view in the DataFrame for using it in SQL
scala> acTransDF.createOrReplaceTempView("trans")
scala> // Print the structure of the DataFrame
scala> acTransDF.printSchema
root
|-- accNo: string (nullable = true)
|-- tranAmount: double (nullable = false)
scala> // Show the first few records of the DataFrame
scala> acTransDF.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> // Use SQL to create another DataFrame containing the good transaction records
//这行是我补充的: 注意,下面的这个spark对象 is of type the SparkSession
scala> val goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
19/12/01 19:26:29 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/12/01 19:26:29 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
19/12/01 19:26:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
goodTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Register temporary view in the DataFrame for using it in SQL
scala> goodTransRecords.createOrReplaceTempView("goodtrans")
scala> // Show the first few records of the DataFrame
scala> goodTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
+-------+----------+
scala> // Use SQL to create another DataFrame containing the high value transaction records
scala> val highValueTransRecords = spark.sql("SELECT accNo, tranAmount FROM goodtrans WHERE tranAmount > 1000")
highValueTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Show the first few records of the DataFrame
scala> highValueTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10006| 10000.0|
|SB10010| 7000.0|
+-------+----------+
scala> // Use SQL to create another DataFrame containing the bad account records
scala> val badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
badAccountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Show the first few records of the DataFrame
scala> badAccountRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
+-------+----------+
scala> // Use SQL to create another DataFrame containing the bad amount records
scala> val badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
badAmountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Show the first few records of the DataFrame
scala> badAmountRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
scala> // Do the union of two DataFrames and create another DataFrame
scala> val badTransRecords = badAccountRecords.union(badAmountRecords)
badTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
scala> // Show the first few records of the DataFrame
scala> badTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> // Calculate the sum
scala> val sumAmount = spark.sql("SELECT sum(tranAmount) as sum FROM goodtrans")
sumAmount: org.apache.spark.sql.DataFrame = [sum: double]
scala> // Show the first few records of the DataFrame
scala> sumAmount.show
+-------+
| sum|
+-------+
|28486.0|
+-------+
scala> // Calculate the maximum
scala> val maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
maxAmount: org.apache.spark.sql.DataFrame = [max: double]
scala> // Show the first few records of the DataFrame
scala> maxAmount.show
+-------+
| max|
+-------+
|10000.0|
+-------+
scala> // Calculate the minimum
scala> val minAmount = spark.sql("SELECT min(tranAmount) as min FROM goodtrans")
minAmount: org.apache.spark.sql.DataFrame = [min: double]
scala> // Show the first few records of the DataFrame
scala> minAmount.show
+----+
| min|
+----+
|30.0|
+----+
scala> // Use SQL to create another DataFrame containing the good account numbers
scala> val goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
goodAccNos: org.apache.spark.sql.DataFrame = [accNo: string]
scala> // Show the first few records of the DataFrame
scala> goodAccNos.show
+-------+
| accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+
scala> // Calculate the aggregates using mixing of DataFrame and RDD like operations
scala> val sumAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce(_ + _)
sumAmountByMixing: Double = 28486.0
scala> val maxAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a > b) a else b)
maxAmountByMixing: Double = 10000.0
scala> val minAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a < b) a else b)
minAmountByMixing: Double = 30.0
scala>
下面是用python spark的写法
yay@yay-ThinkPad-T470-W10DG:~$ cd $SPARK_HOME
yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/pyspark Python 2.7.15+ (default, Oct 7 2019, 17:39:04)
[GCC 7.4.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
19/12/01 20:04:03 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
19/12/01 20:04:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/12/01 20:04:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/12/01 20:04:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 2.7.15+ (default, Oct 7 2019 17:39:04)
SparkSession available as 'spark'.
>>> from pyspark.sql import Row
>>> # Creation of the list from where the RDD is going to be created
... acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000","SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56","SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
>>> # Create the DataFrame
... acTransDF = sc.parallelize(acTransList).map(lambda trans: trans.split(",")).map(lambda p: Row(accNo=p[0],tranAmount=float(p[1]))).toDF()
>>> # Register temporary view in the DataFrame for using it in SQL
... acTransDF.createOrReplaceTempView("trans")
>>> # Print the structure of the DataFrame
... acTransDF.printSchema()
root
|-- accNo: string (nullable = true)
|-- tranAmount: double (nullable = true)
>>> # Show the first few records of the DataFrame
... acTransDF.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
>>> # Use SQL to create another DataFrame containing the good transaction records
... goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
19/12/01 20:10:04 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
>>> # Register temporary table in the DataFrame for using it in SQL
... goodTransRecords.createOrReplaceTempView("goodtrans")
>>> # Show the first few records of the DataFrame
... goodTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
+-------+----------+
>>> # Use SQL to create another DataFrame containing the high value transaction records
... highValueTransRecords = spark.sql("SELECT accNo, tranAmount FROM goodtrans WHERE tranAmount > 1000")
>>> # Show the first few records of the DataFrame
... highValueTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10006| 10000.0|
|SB10010| 7000.0|
+-------+----------+
>>> # Use SQL to create another DataFrame containing the bad account records
... badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
>>> # Show the first few records of the DataFrame
... badAccountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
+-------+----------+
>>> # Use SQL to create another DataFrame containing the bad amount records
... badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
>>> # Show the first few records of the DataFrame
... badAmountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
>>> # Do the union of two DataFrames and create another DataFrame
... badTransRecords = badAccountRecords.union(badAmountRecords)
>>> # Show the first few records of the DataFrame
... badTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
>>> # Calculate the sum
... sumAmount = spark.sql("SELECT sum(tranAmount)as sum FROM goodtrans")
>>> # Show the first few records of the DataFrame
... sumAmount.show()
+-------+
| sum|
+-------+
|28486.0|
+-------+
>>> # Calculate the maximum
... maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
>>> # Show the first few records of the DataFrame
... maxAmount.show()
+-------+
| max|
+-------+
|10000.0|
+-------+
>>> # Calculate the minimum
... minAmount = spark.sql("SELECT min(tranAmount)as min FROM goodtrans")
>>> # Show the first few records of the DataFrame
... minAmount.show()
+----+
| min|
+----+
|30.0|
+----+
>>> # Use SQL to create another DataFrame containing the good account numbers
... goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
>>> # Show the first few records of the DataFrame
... goodAccNos.show()
+-------+
| accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+
>>> # Calculate the sum using mixing of DataFrame and RDD like operations
... sumAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a+b)
>>> sumAmountByMixing
28486.0
>>> # Calculate the maximum using mixing of DataFrame and RDD like operations
... maxAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a > b else b)
>>> maxAmountByMixing
10000.0
>>> # Calculate the minimum using mixing of DataFrame and RDD like operations
... minAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a < b else b)
>>> minAmountByMixing
30.0
>>>
2. 使用DataFrame API
scala> case class Trans(accNo: String, tranAmount: Double)
defined class Trans
scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
toTrans: Seq[String] => Trans
scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, SB10010,7000, CR10001,7000, SB10002,-10)
scala> val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_))
acTransRDD: org.apache.spark.rdd.RDD[Trans] = MapPartitionsRDD[2] at map at <console>:28
scala> val acTransDF = spark.createDataFrame(acTransRDD)
acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala>
scala> acTransDF.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> val goodTransRecords = acTransDF.filter("accNo like 'SB%'").filter("tranAmount > 0")
goodTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
scala> goodTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
+-------+----------+
scala> val highValueTransRecords = goodTransRecords.filter("tranAmount > 1000")
highValueTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
scala> highValueTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10006| 10000.0|
|SB10010| 7000.0|
+-------+----------+
scala> val badAccountRecords = acTransDF.filter("accNo NOT like 'SB%'")
badAccountRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
scala> badAccountRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
+-------+----------+
scala> val badAmountRecords = acTransDF.filter("tranAmount < 0")
badAmountRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
scala> badAmountRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
scala> val badTransRecords = badAccountRecords.union(badAmountRecords)
badTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
scala> badTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> // Calculate the aggregates in one shot
scala> val aggregates = goodTransRecords.agg(sum("tranAmount"), max("tranAmount"), min("tranAmount"))
aggregates: org.apache.spark.sql.DataFrame = [sum(tranAmount): double, max(tranAmount): double ... 1 more field]
scala> aggregates.show
+---------------+---------------+---------------+
|sum(tranAmount)|max(tranAmount)|min(tranAmount)|
+---------------+---------------+---------------+
| 28486.0| 10000.0| 30.0|
+---------------+---------------+---------------+
scala> val goodAccNos = acTransDF.filter("accNo like 'SB%'").select("accNo").distinct().orderBy("accNo")
goodAccNos: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string]
scala> goodAccNos.show
+-------+
| accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+
scala> // Persist the data of the DataFrame into a Parquet file
scala> acTransDF.write.parquet("scala.trans.parquet")
scala> // Read the data into a DataFrame from the Parquet file
scala> val acTransDFfromParquet = spark.read.parquet("scala.trans.parquet")
acTransDFfromParquet: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> acTransDFfromParquet.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
+-------+----------+
scala>
image.png
3. Understanding Aggregations in Spark SQL
scala> case class Trans(accNo: String, tranAmount: Double)
defined class Trans
scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
toTrans: Seq[String] => Trans
scala> val acTransList = Array("SB10001,1000", "SB10002,1200","SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000","SB10004,500","SB10005,56", "SB10003,30","SB10002,7000","SB10001,-100", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10001,8000, SB10002,400, SB10003,300, SB10001,10000, SB10004,500, SB10005,56, SB10003,30, SB10002,7000, SB10001,-100, SB10002,-10)
scala> // Create the DataFrame
scala> val acTransDF = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_)).toDF()
acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> acTransDF.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10001| 8000.0|
|SB10002| 400.0|
|SB10003| 300.0|
|SB10001| 10000.0|
|SB10004| 500.0|
|SB10005| 56.0|
|SB10003| 30.0|
|SB10002| 7000.0|
|SB10001| -100.0|
|SB10002| -10.0|
+-------+----------+
scala> acTransDF.createOrReplaceTempView("trans")
scala> val acSummary = spark.sql("SELECT accNo, sum(tranAmount) as TransTotal FROM trans GROUP BY accNo")
acSummary: org.apache.spark.sql.DataFrame = [accNo: string, TransTotal: double]
scala> acSummary.show
+-------+----------+
| accNo|TransTotal|
+-------+----------+
|SB10005| 56.0|
|SB10004| 500.0|
|SB10003| 330.0|
|SB10002| 8590.0|
|SB10001| 18900.0|
+-------+----------+
scala> // Create the DataFrame using API for the account summary records
scala> val acSummaryViaDFAPI = acTransDF.groupBy("accNo").agg(sum("tranAmount") as "TransTotal")
acSummaryViaDFAPI: org.apache.spark.sql.DataFrame = [accNo: string, TransTotal: double]
scala> acSummaryViaDFAPI.show
+-------+----------+
| accNo|TransTotal|
+-------+----------+
|SB10005| 56.0|
|SB10004| 500.0|
|SB10003| 330.0|
|SB10002| 8590.0|
|SB10001| 18900.0|
+-------+----------+
4. multi-datasource joining with SparkSQL
In the previous chapter, the joining of multiple RDDs based on the key has been discussed.In this section, the same use case is implemented using Spark SQL. The use cases selected for elucidating the joining of multiple datasets using a key are given here.
The first dataset contains a retail banking master records summary with account number, first name, and last name. The second dataset contains the retail banking account balance with account number and balance amount. The key on both of the datasets is account
number. Join the two datasets and create one dataset containing account number, first name, last name, and balance amount
scala> case class AcMaster(accNo: String, firstName: String, lastName: String)
defined class AcMaster
scala> case class AcBal(accNo: String, balanceAmount: Double)
defined class AcBal
scala> def toAcMaster = (master: Seq[String]) => AcMaster(master(0), master(1), master(2))
toAcMaster: Seq[String] => AcMaster
scala> def toAcBal = (bal: Seq[String]) => AcBal(bal(0), bal(1).trim.toDouble)
toAcBal: Seq[String] => AcBal
scala> val acMasterList = Array("SB10001,Roger,Federer","SB10002,Pete,Sampras","SB10003,Rafael,Nadal","SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
acMasterList: Array[String] = Array(SB10001,Roger,Federer, SB10002,Pete,Sampras, SB10003,Rafael,Nadal, SB10004,Boris,Becker, SB10005,Ivan,Lendl)
scala> val acBalList = Array("SB10001,50000","SB10002,12000","SB10003,3000", "SB10004,8500", "SB10005,5000")
acBalList: Array[String] = Array(SB10001,50000, SB10002,12000, SB10003,3000, SB10004,8500, SB10005,5000)
scala> val acMasterDF = sc.parallelize(acMasterList).map(_.split(",")).map(toAcMaster(_)).toDF()
acMasterDF: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 1 more field]
scala> val acBalDF = sc.parallelize(acBalList).map(_.split(",")).map(toAcBal(_)).toDF()
acBalDF: org.apache.spark.sql.DataFrame = [accNo: string, balanceAmount: double]
scala> acMasterDF.write.parquet("scala.master.parquet")
scala> acBalDF.write.json("scalaMaster.json")
scala> val acMasterDFFromFile = spark.read.parquet("scala.master.parquet")
acMasterDFFromFile: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 1 more field]
scala> acMasterDFFromFile.createOrReplaceTempView("master")
scala> val acBalDFFromFile = spark.read.json("scalaMaster.json")
acBalDFFromFile: org.apache.spark.sql.DataFrame = [accNo: string, balanceAmount: double]
scala> acBalDFFromFile.createOrReplaceTempView("balance")
scala> val acDetail = spark.sql("SELECT master.accNo, firstName, lastName, balanceAmount FROM master, balance WHERE master.accNo = balance.accNo ORDER BY balanceAmount DESC")
acDetail: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 2 more fields]
scala> acDetail.show
+-------+---------+--------+-------------+
| accNo|firstName|lastName|balanceAmount|
+-------+---------+--------+-------------+
|SB10001| Roger| Federer| 50000.0|
|SB10002| Pete| Sampras| 12000.0|
|SB10004| Boris| Becker| 8500.0|
|SB10005| Ivan| Lendl| 5000.0|
|SB10003| Rafael| Nadal| 3000.0|
+-------+---------+--------+-------------+
scala> // Use SQL to create another DataFrame containing the top 3 account detail records
scala> val acDetailTop3 = spark.sql("SELECT master.accNo, firstName, lastName, balanceAmount FROM master, balance WHERE master.accNo = balance.accNo ORDER BY balanceAmount DESC").limit(3)
acDetailTop3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, firstName: string ... 2 more fields]
scala> acDetailTop3.show
+-------+---------+--------+-------------+
| accNo|firstName|lastName|balanceAmount|
+-------+---------+--------+-------------+
|SB10001| Roger| Federer| 50000.0|
|SB10002| Pete| Sampras| 12000.0|
|SB10004| Boris| Becker| 8500.0|
+-------+---------+--------+-------------+
这个例子也可以使用DataFrame API
scala> val acDetailFromAPI = acMasterDFFromFile.join(acBalDFFromFile, acMasterDFFromFile("accNo") === acBalDFFromFile("accNo"), "inner").sort($"balanceAmount".desc).select(acMasterDFFromFile("accNo"), acMasterDFFromFile("firstName"), acMasterDFFromFile("lastName"),acBalDFFromFile("balanceAmount"))
acDetailFromAPI: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 2 more fields]
scala> acDetailFromAPI.show
+-------+---------+--------+-------------+
| accNo|firstName|lastName|balanceAmount|
+-------+---------+--------+-------------+
|SB10001| Roger| Federer| 50000.0|
|SB10002| Pete| Sampras| 12000.0|
|SB10004| Boris| Becker| 8500.0|
|SB10005| Ivan| Lendl| 5000.0|
|SB10003| Rafael| Nadal| 3000.0|
+-------+---------+--------+-------------+
网友评论