美文网首页
spark combineByKey常用的数据操作

spark combineByKey常用的数据操作

作者: 艾七 | 来源:发表于2017-07-05 01:36 被阅读0次

    聚合函数combineByKey

    将RDD[k,v]转化为RDD[k,c],利用该函数可以实现reduceByKey函数的功能。也可以实现类似于join的操作

    参数简介

    • createCombiner: V => C

    处理每个分区数据时,如果遇到key没有出现的,就会创建一个该键对应的累加器初始值,每个分区相互独立。

    • mergeValue: (C, V) => C

    处理每个分区数据时,如果遇到key已经出现,则利用mergeValue进行合并处理。

    • mergeCombiners: (C, C) => C

    所有分区数据处理完成后,利用mergeCombiners对各个分区的累加器进行再次合并

    实现reduceByKey函数

    将List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的数据按照key,对value做求和计算,顺带统计次数

    val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
    type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
    val combReault = rdd.combineByKey(
      score => (1, score),
      (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
      (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
    )
    //打印计算结果
    combReault.collect().foreach(println)
    //结果
    (A,(2,101.0))
    (B,(2,94.0))
    (C,(1,91.0))
    

    实现join操作

    spark实现join操作非常简单 rddA.join(rddB)即可实现

    def joinTest(sc:SparkContext): Unit ={
    val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
      (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
    val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
    rddA.join(rddB).collect().foreach(println)}
    //结果
    (1,(www,songshifan))
    (1,(iteblog,songshifan))
    (1,(com,songshifan))
    (2,(bbs,haiyang))
    (2,(iteblog,haiyang))
    (2,(com,haiyang))
    (3,(good,home))
    

    跟sql的left join类似

    • 下面我们尝试使用spark sql来实现join操作
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    /**
    * Created by songsf on 2017/7/4.
    */
    object SparkSqlTest {
        def main(args: Array[String]) {
        val spark = SparkSession
                    .builder().master("local[*]")
                    .appName("Spark SQL data sources example")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate()
        val sc = spark.sparkContext
        val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
        ("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes =>         Row(attributes._1, attributes._2))
        val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3",              "home"))).map(attributes => Row(attributes._1, attributes._2))
        val schemaString = "key name"
        val fields = schemaString.split(" ")
        .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
        val dataA = spark.createDataFrame(rddA, schema)
        dataA.createOrReplaceTempView("dataA")
        val dataB = spark.createDataFrame(rddB, schema)
        dataB.createOrReplaceTempView("dataB")
        dataA.show()
        dataB.show()
        val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
        val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
        spark.stop()
        }
    }
    //结果
    +---+-------+----------+
    |key|   name|     name2|
    +---+-------+----------+
    |  3|   good|      home|
    |  1|    www|songshifan|
    |  1|iteblog|songshifan|
    |  1|    com|songshifan|
    |  2|    bbs|   haiyang|
    |  2|iteblog|   haiyang|
    |  2|    com|   haiyang|
    +---+-------+----------+
    
    • 注意:在使用spark-session时,总是会报SparkSession类找不到的错误,这是因为我们的代码是运行在本地环境中,maven在打包的时候没有把Spark-session相关的内容打到我们的package中,这一点可以将编译好的jar包解压到相应的目录下找找看。

    • 解决办法:在编辑器运行时,强制指定依赖的jar包。

    • 疑问:之前测试过1.4版本的,写好的代码不把依赖jar包打入我们的jar包中,提交集群时会报错,所以1把所有依赖包都打入jar包中,2 在执行时用--jars参数去提交机器上找jar包。现在有一种说法是运行环境已经把依赖包都放在创建的执行器中,不必再加入依赖jar包。这个需要继续研究、测试。

    相关文章

      网友评论

          本文标题:spark combineByKey常用的数据操作

          本文链接:https://www.haomeiwen.com/subject/besfhxtx.html