美文网首页
006 Spark SQL way of programming

006 Spark SQL way of programming

作者: 逸章 | 来源:发表于2019-12-01 17:50 被阅读0次

    这个和005 是一个usecase,但是采用完全不同的编程模型

    1. SQL queries

    yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/spark-shell 
    19/12/01 17:42:52 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
    19/12/01 17:42:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    19/12/01 17:42:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://192.168.1.16:4040
    Spark context available as 'sc' (master = local[*], app id = local-1575193384359).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
          /_/
             
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    
    scala> // Define the case classes for using in conjunction with DataFrames
    
    scala> case class Trans(accNo: String, tranAmount: Double)
    defined class Trans
    
    scala> // Functions to convert the sequence of strings to objects defined by the case classes
    
    scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
    toTrans: Seq[String] => Trans
    
    scala> // Creation of the list from where the RDD is going to be created
    
    scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10")
    acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, SB10010,7000, CR10001,7000, SB10002,-10)
    
    scala> val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_))
    acTransRDD: org.apache.spark.rdd.RDD[Trans] = MapPartitionsRDD[2] at map at <console>:28
    
    scala> // Convert RDD to DataFrame
    
    scala> val acTransDF = spark.createDataFrame(acTransRDD)
    acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> // Register temporary view in the DataFrame for using it in SQL
    
    scala> acTransDF.createOrReplaceTempView("trans")
    
    scala> // Print the structure of the DataFrame
    
    scala> acTransDF.printSchema
    root
     |-- accNo: string (nullable = true)
     |-- tranAmount: double (nullable = false)
    
    
    scala> // Show the first few records of the DataFrame
    
    scala> acTransDF.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10010|    7000.0|
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> // Use SQL to create another DataFrame containing the good transaction records
    //这行是我补充的: 注意,下面的这个spark对象 is of type the SparkSession
    scala> val goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
    19/12/01 19:26:29 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
    19/12/01 19:26:29 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
    19/12/01 19:26:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    goodTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> // Register temporary view in the DataFrame for using it in SQL
    
    scala> goodTransRecords.createOrReplaceTempView("goodtrans")
    
    scala> // Show the first few records of the DataFrame
    
    scala> goodTransRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10010|    7000.0|
    +-------+----------+
    
    
    scala> // Use SQL to create another DataFrame containing the high value transaction records
    
    scala> val highValueTransRecords = spark.sql("SELECT accNo, tranAmount FROM goodtrans WHERE tranAmount > 1000")
    highValueTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> highValueTransRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10006|   10000.0|
    |SB10010|    7000.0|
    +-------+----------+
    
    
    scala> // Use SQL to create another DataFrame containing the bad account records
    
    scala> val badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
    badAccountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> badAccountRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |CR10001|    7000.0|
    +-------+----------+
    
    
    scala> // Use SQL to create another DataFrame containing the bad amount records
    
    scala> val badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
    badAmountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> badAmountRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> // Do the union of two DataFrames and create another DataFrame
    
    scala> val badTransRecords = badAccountRecords.union(badAmountRecords)
    badTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> badTransRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> // Calculate the sum
    
    scala> val sumAmount = spark.sql("SELECT sum(tranAmount) as sum FROM goodtrans")
    sumAmount: org.apache.spark.sql.DataFrame = [sum: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> sumAmount.show
    +-------+
    |    sum|
    +-------+
    |28486.0|
    +-------+
    
    
    
    scala> // Calculate the maximum
    
    scala> val maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
    maxAmount: org.apache.spark.sql.DataFrame = [max: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> maxAmount.show
    +-------+
    |    max|
    +-------+
    |10000.0|
    +-------+
    
    
    scala> // Calculate the minimum
    
    scala> val minAmount = spark.sql("SELECT min(tranAmount) as min FROM goodtrans")
    minAmount: org.apache.spark.sql.DataFrame = [min: double]
    
    scala> // Show the first few records of the DataFrame
    
    scala> minAmount.show
    +----+
    | min|
    +----+
    |30.0|
    +----+
    
    
    scala> // Use SQL to create another DataFrame containing the good account numbers
    
    scala> val goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
    goodAccNos: org.apache.spark.sql.DataFrame = [accNo: string]
    
    scala> // Show the first few records of the DataFrame
    
    scala> goodAccNos.show
    +-------+                                                                       
    |  accNo|
    +-------+
    |SB10001|
    |SB10002|
    |SB10003|
    |SB10004|
    |SB10005|
    |SB10006|
    |SB10007|
    |SB10008|
    |SB10009|
    |SB10010|
    +-------+
    
    
    scala> // Calculate the aggregates using mixing of DataFrame and RDD like operations
    
    scala> val sumAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce(_ + _)
    sumAmountByMixing: Double = 28486.0
    
    scala> val maxAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a > b) a else b)
    maxAmountByMixing: Double = 10000.0
    
    scala> val minAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a < b) a else b)
    minAmountByMixing: Double = 30.0
    
    scala> 
    
    

    下面是用python spark的写法

    yay@yay-ThinkPad-T470-W10DG:~$ cd $SPARK_HOME
    yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/pyspark Python 2.7.15+ (default, Oct  7 2019, 17:39:04) 
    [GCC 7.4.0] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    19/12/01 20:04:03 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
    19/12/01 20:04:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    19/12/01 20:04:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    19/12/01 20:04:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
          /_/
    
    Using Python version 2.7.15+ (default, Oct  7 2019 17:39:04)
    SparkSession available as 'spark'.
    >>> from pyspark.sql import Row
    >>> # Creation of the list from where the RDD is going to be created
    ... acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000","SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56","SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
    >>> # Create the DataFrame
    ... acTransDF = sc.parallelize(acTransList).map(lambda trans: trans.split(",")).map(lambda p: Row(accNo=p[0],tranAmount=float(p[1]))).toDF()
    >>> # Register temporary view in the DataFrame for using it in SQL
    ... acTransDF.createOrReplaceTempView("trans")
    >>> # Print the structure of the DataFrame
    ... acTransDF.printSchema()
    root
     |-- accNo: string (nullable = true)
     |-- tranAmount: double (nullable = true)
    
    >>> # Show the first few records of the DataFrame
    ... acTransDF.show()
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10010|    7000.0|
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    >>> # Use SQL to create another DataFrame containing the good transaction records
    ... goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
    19/12/01 20:10:04 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    >>> # Register temporary table in the DataFrame for using it in SQL
    ... goodTransRecords.createOrReplaceTempView("goodtrans")
    >>> # Show the first few records of the DataFrame
    ... goodTransRecords.show()
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10010|    7000.0|
    +-------+----------+
    
    >>> # Use SQL to create another DataFrame containing the high value transaction records
    ... highValueTransRecords = spark.sql("SELECT accNo, tranAmount FROM goodtrans WHERE tranAmount > 1000")
    >>> # Show the first few records of the DataFrame
    ... highValueTransRecords.show()
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10006|   10000.0|
    |SB10010|    7000.0|
    +-------+----------+
    
    >>> # Use SQL to create another DataFrame containing the bad account records
    ... badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
    >>> # Show the first few records of the DataFrame
    ... badAccountRecords.show()
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |CR10001|    7000.0|
    +-------+----------+
    
    >>> # Use SQL to create another DataFrame containing the bad amount records
    ... badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
    >>> # Show the first few records of the DataFrame
    ... badAmountRecords.show()
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10002|     -10.0|
    +-------+----------+
    
    >>> # Do the union of two DataFrames and create another DataFrame
    ... badTransRecords = badAccountRecords.union(badAmountRecords)
    >>> # Show the first few records of the DataFrame
    ... badTransRecords.show()
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    >>> # Calculate the sum
    ... sumAmount = spark.sql("SELECT sum(tranAmount)as sum FROM goodtrans")
    >>> # Show the first few records of the DataFrame
    ... sumAmount.show()
    +-------+
    |    sum|
    +-------+
    |28486.0|
    +-------+
    
    >>> # Calculate the maximum
    ... maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
    >>> # Show the first few records of the DataFrame
    ... maxAmount.show()
    +-------+
    |    max|
    +-------+
    |10000.0|
    +-------+
    
    >>> # Calculate the minimum
    ... minAmount = spark.sql("SELECT min(tranAmount)as min FROM goodtrans")
    >>> # Show the first few records of the DataFrame
    ... minAmount.show()
    +----+
    | min|
    +----+
    |30.0|
    +----+
    
    >>> # Use SQL to create another DataFrame containing the good account numbers
    ... goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
    >>> # Show the first few records of the DataFrame
    ... goodAccNos.show()
    +-------+                                                                       
    |  accNo|
    +-------+
    |SB10001|
    |SB10002|
    |SB10003|
    |SB10004|
    |SB10005|
    |SB10006|
    |SB10007|
    |SB10008|
    |SB10009|
    |SB10010|
    +-------+
    
    >>> # Calculate the sum using mixing of DataFrame and RDD like operations
    ... sumAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a+b)
    >>> sumAmountByMixing
    28486.0
    >>> # Calculate the maximum using mixing of DataFrame and RDD like operations
    ... maxAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a > b else b)
    >>> maxAmountByMixing
    10000.0
    >>> # Calculate the minimum using mixing of DataFrame and RDD like operations
    ... minAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a < b else b)
    >>> minAmountByMixing
    30.0
    >>> 
    

    2. 使用DataFrame API

    scala> case class Trans(accNo: String, tranAmount: Double)
    defined class Trans
    
    scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
    toTrans: Seq[String] => Trans
    
    scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10")
    acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, SB10010,7000, CR10001,7000, SB10002,-10)
    
    scala> val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_))
    acTransRDD: org.apache.spark.rdd.RDD[Trans] = MapPartitionsRDD[2] at map at <console>:28
    
    scala> val acTransDF = spark.createDataFrame(acTransRDD)
    acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    
    
    scala> 
    
    scala> acTransDF.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10010|    7000.0|
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> val goodTransRecords = acTransDF.filter("accNo like 'SB%'").filter("tranAmount > 0")
    goodTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
    
    scala> goodTransRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10010|    7000.0|
    +-------+----------+
    
    
    scala> val highValueTransRecords = goodTransRecords.filter("tranAmount > 1000")
    highValueTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
    
    scala> highValueTransRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10006|   10000.0|
    |SB10010|    7000.0|
    +-------+----------+
    
    
    scala> val badAccountRecords = acTransDF.filter("accNo NOT like 'SB%'")
    badAccountRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
    
    scala> badAccountRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |CR10001|    7000.0|
    +-------+----------+
    
    
    scala> val badAmountRecords = acTransDF.filter("tranAmount < 0")
    badAmountRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
    
    scala> badAmountRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> val badTransRecords = badAccountRecords.union(badAmountRecords)
    badTransRecords: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, tranAmount: double]
    
    scala> badTransRecords.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> // Calculate the aggregates in one shot
    
    scala> val aggregates = goodTransRecords.agg(sum("tranAmount"), max("tranAmount"), min("tranAmount"))
    aggregates: org.apache.spark.sql.DataFrame = [sum(tranAmount): double, max(tranAmount): double ... 1 more field]
    
    scala> aggregates.show
    +---------------+---------------+---------------+
    |sum(tranAmount)|max(tranAmount)|min(tranAmount)|
    +---------------+---------------+---------------+
    |        28486.0|        10000.0|           30.0|
    +---------------+---------------+---------------+
    
    
    scala> val goodAccNos = acTransDF.filter("accNo like 'SB%'").select("accNo").distinct().orderBy("accNo")
    goodAccNos: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string]
    
    scala> goodAccNos.show
    +-------+                                                                       
    |  accNo|
    +-------+
    |SB10001|
    |SB10002|
    |SB10003|
    |SB10004|
    |SB10005|
    |SB10006|
    |SB10007|
    |SB10008|
    |SB10009|
    |SB10010|
    +-------+
    
    
    scala> // Persist the data of the DataFrame into a Parquet file
    
    scala> acTransDF.write.parquet("scala.trans.parquet")
                                                                                    
    scala> // Read the data into a DataFrame from the Parquet file
    
    scala> val acTransDFfromParquet = spark.read.parquet("scala.trans.parquet")
    acTransDFfromParquet: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> acTransDFfromParquet.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10010|    7000.0|
    |CR10001|    7000.0|
    |SB10002|     -10.0|
    |SB10007|     500.0|
    |SB10008|      56.0|
    |SB10009|      30.0|
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10003|    8000.0|
    |SB10004|     400.0|
    |SB10005|     300.0|
    |SB10006|   10000.0|
    +-------+----------+
    
    
    scala> 
    
    
    image.png

    3. Understanding Aggregations in Spark SQL

    scala> case class Trans(accNo: String, tranAmount: Double)
    defined class Trans
    
    scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
    toTrans: Seq[String] => Trans
    
    scala> val acTransList = Array("SB10001,1000", "SB10002,1200","SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000","SB10004,500","SB10005,56", "SB10003,30","SB10002,7000","SB10001,-100", "SB10002,-10")
    acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10001,8000, SB10002,400, SB10003,300, SB10001,10000, SB10004,500, SB10005,56, SB10003,30, SB10002,7000, SB10001,-100, SB10002,-10)
    
    scala> // Create the DataFrame
    
    scala> val acTransDF = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_)).toDF()
    acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
    
    scala> acTransDF.show
    +-------+----------+
    |  accNo|tranAmount|
    +-------+----------+
    |SB10001|    1000.0|
    |SB10002|    1200.0|
    |SB10001|    8000.0|
    |SB10002|     400.0|
    |SB10003|     300.0|
    |SB10001|   10000.0|
    |SB10004|     500.0|
    |SB10005|      56.0|
    |SB10003|      30.0|
    |SB10002|    7000.0|
    |SB10001|    -100.0|
    |SB10002|     -10.0|
    +-------+----------+
    
    
    scala> acTransDF.createOrReplaceTempView("trans")
    
    scala> val acSummary = spark.sql("SELECT accNo, sum(tranAmount) as TransTotal FROM trans GROUP BY accNo")
    acSummary: org.apache.spark.sql.DataFrame = [accNo: string, TransTotal: double]
    
    scala> acSummary.show
    +-------+----------+                                                            
    |  accNo|TransTotal|
    +-------+----------+
    |SB10005|      56.0|
    |SB10004|     500.0|
    |SB10003|     330.0|
    |SB10002|    8590.0|
    |SB10001|   18900.0|
    +-------+----------+
    
    
    scala> // Create the DataFrame using API for the account summary records
    
    scala> val acSummaryViaDFAPI = acTransDF.groupBy("accNo").agg(sum("tranAmount") as "TransTotal")
    acSummaryViaDFAPI: org.apache.spark.sql.DataFrame = [accNo: string, TransTotal: double]
    
    scala> acSummaryViaDFAPI.show
    +-------+----------+                                                            
    |  accNo|TransTotal|
    +-------+----------+
    |SB10005|      56.0|
    |SB10004|     500.0|
    |SB10003|     330.0|
    |SB10002|    8590.0|
    |SB10001|   18900.0|
    +-------+----------+
    
    
    

    4. multi-datasource joining with SparkSQL

    In the previous chapter, the joining of multiple RDDs based on the key has been discussed.In this section, the same use case is implemented using Spark SQL. The use cases selected for elucidating the joining of multiple datasets using a key are given here.
    The first dataset contains a retail banking master records summary with account number, first name, and last name. The second dataset contains the retail banking account balance with account number and balance amount. The key on both of the datasets is account
    number. Join the two datasets and create one dataset containing account number, first name, last name, and balance amount

    scala> case class AcMaster(accNo: String, firstName: String, lastName: String)
    defined class AcMaster
    
    scala> case class AcBal(accNo: String, balanceAmount: Double)
    defined class AcBal
    
    scala> def toAcMaster = (master: Seq[String]) => AcMaster(master(0), master(1), master(2))
    toAcMaster: Seq[String] => AcMaster
    
    scala> def toAcBal = (bal: Seq[String]) => AcBal(bal(0), bal(1).trim.toDouble)
    toAcBal: Seq[String] => AcBal
    
    scala> val acMasterList = Array("SB10001,Roger,Federer","SB10002,Pete,Sampras","SB10003,Rafael,Nadal","SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
    acMasterList: Array[String] = Array(SB10001,Roger,Federer, SB10002,Pete,Sampras, SB10003,Rafael,Nadal, SB10004,Boris,Becker, SB10005,Ivan,Lendl)
    
    scala> val acBalList = Array("SB10001,50000","SB10002,12000","SB10003,3000", "SB10004,8500", "SB10005,5000")
    acBalList: Array[String] = Array(SB10001,50000, SB10002,12000, SB10003,3000, SB10004,8500, SB10005,5000)
    
    scala> val acMasterDF = sc.parallelize(acMasterList).map(_.split(",")).map(toAcMaster(_)).toDF()
    acMasterDF: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 1 more field]
    
    scala> val acBalDF = sc.parallelize(acBalList).map(_.split(",")).map(toAcBal(_)).toDF()
    acBalDF: org.apache.spark.sql.DataFrame = [accNo: string, balanceAmount: double]
    
    scala> acMasterDF.write.parquet("scala.master.parquet")
                                                                                    
    scala> acBalDF.write.json("scalaMaster.json")
    
    scala> val acMasterDFFromFile = spark.read.parquet("scala.master.parquet")
    acMasterDFFromFile: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 1 more field]
    
    scala> acMasterDFFromFile.createOrReplaceTempView("master")
    
    scala> val acBalDFFromFile = spark.read.json("scalaMaster.json")
    acBalDFFromFile: org.apache.spark.sql.DataFrame = [accNo: string, balanceAmount: double]
    
    scala> acBalDFFromFile.createOrReplaceTempView("balance")
    
    scala> val acDetail = spark.sql("SELECT master.accNo, firstName, lastName, balanceAmount FROM master, balance WHERE master.accNo = balance.accNo ORDER BY balanceAmount DESC")
    acDetail: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 2 more fields]
    
    scala> acDetail.show
    +-------+---------+--------+-------------+
    |  accNo|firstName|lastName|balanceAmount|
    +-------+---------+--------+-------------+
    |SB10001|    Roger| Federer|      50000.0|
    |SB10002|     Pete| Sampras|      12000.0|
    |SB10004|    Boris|  Becker|       8500.0|
    |SB10005|     Ivan|   Lendl|       5000.0|
    |SB10003|   Rafael|   Nadal|       3000.0|
    +-------+---------+--------+-------------+
    
    scala> // Use SQL to create another DataFrame containing the top 3 account detail records
    
    scala> val acDetailTop3 = spark.sql("SELECT master.accNo, firstName, lastName, balanceAmount FROM master, balance WHERE master.accNo = balance.accNo ORDER BY balanceAmount DESC").limit(3)
    acDetailTop3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [accNo: string, firstName: string ... 2 more fields]
    
    scala> acDetailTop3.show
    +-------+---------+--------+-------------+
    |  accNo|firstName|lastName|balanceAmount|
    +-------+---------+--------+-------------+
    |SB10001|    Roger| Federer|      50000.0|
    |SB10002|     Pete| Sampras|      12000.0|
    |SB10004|    Boris|  Becker|       8500.0|
    +-------+---------+--------+-------------+
    
    

    这个例子也可以使用DataFrame API

    scala> val acDetailFromAPI = acMasterDFFromFile.join(acBalDFFromFile, acMasterDFFromFile("accNo") === acBalDFFromFile("accNo"), "inner").sort($"balanceAmount".desc).select(acMasterDFFromFile("accNo"), acMasterDFFromFile("firstName"), acMasterDFFromFile("lastName"),acBalDFFromFile("balanceAmount"))
    acDetailFromAPI: org.apache.spark.sql.DataFrame = [accNo: string, firstName: string ... 2 more fields]
    
    scala> acDetailFromAPI.show
    +-------+---------+--------+-------------+
    |  accNo|firstName|lastName|balanceAmount|
    +-------+---------+--------+-------------+
    |SB10001|    Roger| Federer|      50000.0|
    |SB10002|     Pete| Sampras|      12000.0|
    |SB10004|    Boris|  Becker|       8500.0|
    |SB10005|     Ivan|   Lendl|       5000.0|
    |SB10003|   Rafael|   Nadal|       3000.0|
    +-------+---------+--------+-------------+
    
    
    

    相关文章

      网友评论

          本文标题:006 Spark SQL way of programming

          本文链接:https://www.haomeiwen.com/subject/bvazwctx.html