美文网首页
005 MapReduce 和 RDD的Join

005 MapReduce 和 RDD的Join

作者: 逸章 | 来源:发表于2019-11-30 15:55 被阅读0次

    1. MapReduce

    本例子是用到MapReduce的data processing,use case 如下:

    1. The retail banking transaction records come with account numbers and the
      transaction amounts in comma-separated strings.
    2. Pair the transactions to have key/value pairs such as ( AccNo , TranAmount ).
    3. Find an account level summary of all the transactions to get the account balance.

    1. 1 用spark-shell

    scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000", "SB10004,500", "SB10005,56", "SB10003,30","SB10002,7000", "SB10001,-100", "SB10002,-10")
    acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10001,8000, SB10002,400, SB10003,300, SB10001,10000, SB10004,500, SB10005,56, SB10003,30, SB10002,7000, SB10001,-100, SB10002,-10)
    
    scala> val acTransRDD = sc.parallelize(acTransList)
    acTransRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
    
    scala> val acKeyVal = acTransRDD.map(trans => (trans.split(",")(0), trans.split(",")(1).toDouble))
    acKeyVal: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[1] at map at <console>:25
    
    scala> val accSummary = acKeyVal.reduceByKey(_ + _).sortByKey()
    accSummary: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[5] at sortByKey at <console>:25
    
    scala> accSummary.collect()
    res0: Array[(String, Double)] = Array((SB10001,18900.0), (SB10002,8590.0), (SB10003,330.0), (SB10004,500.0), (SB10005,56.0))
    
    scala> 
    

    1.2 用pyspark实现相同的功能

    yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./sbin/start-all.sh 
    starting org.apache.spark.deploy.master.Master, logging to /home/yay/software/spark-2.4.4-bin-hadoop2.7/logs/spark-yay-org.apache.spark.deploy.master.Master-1-yay-ThinkPad-T470-W10DG.out
    localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/yay/software/spark-2.4.4-bin-hadoop2.7/logs/spark-yay-org.apache.spark.deploy.worker.Worker-1-yay-ThinkPad-T470-W10DG.out
    yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/pyspark Python 2.7.15+ (default, Oct  7 2019, 17:39:04) 
    [GCC 7.4.0] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    19/11/30 20:10:56 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
    19/11/30 20:10:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    19/11/30 20:11:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
          /_/
    
    Using Python version 2.7.15+ (default, Oct  7 2019 17:39:04)
    SparkSession available as 'spark'.
    >>> from decimal import Decimal
    >>> acTransList = ["SB10001,1000", "SB10002,1200", "SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000", "SB10004,500", "SB10005,56", "SB10003,30","SB10002,7000", "SB10001,-100", "SB10002,-10"]
    >>> acTransRDD = sc.parallelize(acTransList)
    >>> acKeyVal = acTransRDD.map(lambda trans: (trans.split(",")[0],Decimal(trans.split(",")[1])))
    >>> accSummary = acKeyVal.reduceByKey(lambda a,b : a+b).sortByKey()
    >>> accSummary.collect()                                                        
    [('SB10001', Decimal('18900')), ('SB10002', Decimal('8590')), ('SB10003', Decimal('330')), ('SB10004', Decimal('500')), ('SB10005', Decimal('56'))]
    >>> 
    
    

    2. RDD的Join

    两个DataSet经过Join后形成一个新的DataSet

    scala> val acMasterList = Array("SB10001,Roger,Federer", "SB10002,Pete,Sampras", "SB10003,Rafael,Nadal", "SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
    acMasterList: Array[String] = Array(SB10001,Roger,Federer, SB10002,Pete,Sampras, SB10003,Rafael,Nadal, SB10004,Boris,Becker, SB10005,Ivan,Lendl)
    
    scala> val acBalList = Array("SB10001,50000", "SB10002,12000", "SB10003,3000", "SB10004,8500", "SB10005,5000")
    acBalList: Array[String] = Array(SB10001,50000, SB10002,12000, SB10003,3000, SB10004,8500, SB10005,5000)
    
    scala> val acMasterRDD = sc.parallelize(acMasterList)
    acMasterRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
    
    scala> val acBalRDD = sc.parallelize(acBalList)
    acBalRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26
    
    scala> val acMasterTuples = acMasterRDD.map(master => master.split(",")).map(masterList => (masterList(0), masterList(1) + " " + masterList(2)))
    acMasterTuples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:25
    
    scala> val acBalTuples = acBalRDD.map(trans => trans.split(",")).map(transList => (transList(0), transList(1)))
    acBalTuples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[5] at map at <console>:25
    
    scala> val acJoinTuples = acMasterTuples.join(acBalTuples).sortByKey().map{case (accno, (name, amount)) => (accno, name,amount)}
    acJoinTuples: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[12] at map at <console>:27
    
    scala> acJoinTuples.collect()
    res0: Array[(String, String, String)] = Array((SB10001,Roger Federer,50000), (SB10002,Pete Sampras,12000), (SB10003,Rafael Nadal,3000), (SB10004,Boris Becker,8500), (SB10005,Ivan Lendl,5000))
    
    scala> 
    
    

    多演示几个spark action

    scala> val acNameAndBalance = acJoinTuples.map{case (accno, name,amount) => (name,amount)}
    acNameAndBalance: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[13] at map at <console>:25
    
    scala> val acTuplesByAmount = acBalTuples.map{case (accno, amount) => (amount.toDouble, accno)}.sortByKey(false)
    acTuplesByAmount: org.apache.spark.rdd.RDD[(Double, String)] = ShuffledRDD[17] at sortByKey at <console>:25
    
    scala> acTuplesByAmount.first()
    res7: (Double, String) = (50000.0,SB10001)
    
    scala> acTuplesByAmount.take(3)
    res8: Array[(Double, String)] = Array((50000.0,SB10001), (12000.0,SB10002), (8500.0,SB10004))
    
    scala> acBalTuples.countByKey()
    res9: scala.collection.Map[String,Long] = Map(SB10001 -> 1, SB10005 -> 1, SB10004 -> 1, SB10002 -> 1, SB10003 -> 1)
    
    scala> acBalTuples.count()
    res10: Long = 5
    
    scala> acNameAndBalance.foreach(println)
    (Boris Becker,8500)
    (Rafael Nadal,3000)
    (Roger Federer,50000)
    (Pete Sampras,12000)
    (Ivan Lendl,5000)
    
    scala> val balanceTotal = sc.accumulator(0.0, "Account Balance Total")
    warning: there were two deprecation warnings; re-run with -deprecation for details
    balanceTotal: org.apache.spark.Accumulator[Double] = 0.0
    
    scala> acBalTuples.map{case (accno, amount) => amount.toDouble}.foreach(bal => balanceTotal += bal)
    
    scala> balanceTotal.value
    res13: Double = 78500.0
    
    scala> 
    

    相关文章

      网友评论

          本文标题:005 MapReduce 和 RDD的Join

          本文链接:https://www.haomeiwen.com/subject/oumowctx.html