聚合函数combineByKey
将RDD[k,v]转化为RDD[k,c],利用该函数可以实现reduceByKey函数的功能。也可以实现类似于join的操作
参数简介
- createCombiner: V => C
处理每个分区数据时,如果遇到key没有出现的,就会创建一个该键对应的累加器初始值,每个分区相互独立。
- mergeValue: (C, V) => C
处理每个分区数据时,如果遇到key已经出现,则利用mergeValue进行合并处理。
- mergeCombiners: (C, C) => C
所有分区数据处理完成后,利用mergeCombiners对各个分区的累加器进行再次合并
实现reduceByKey函数
将List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的数据按照key,对value做求和计算,顺带统计次数
val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
val combReault = rdd.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
)
//打印计算结果
combReault.collect().foreach(println)
//结果
(A,(2,101.0))
(B,(2,94.0))
(C,(1,91.0))
实现join操作
spark实现join操作非常简单 rddA.join(rddB)即可实现
def joinTest(sc:SparkContext): Unit ={
val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
(2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
rddA.join(rddB).collect().foreach(println)}
//结果
(1,(www,songshifan))
(1,(iteblog,songshifan))
(1,(com,songshifan))
(2,(bbs,haiyang))
(2,(iteblog,haiyang))
(2,(com,haiyang))
(3,(good,home))
跟sql的left join类似
- 下面我们尝试使用spark sql来实现join操作
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by songsf on 2017/7/4.
*/
object SparkSqlTest {
def main(args: Array[String]) {
val spark = SparkSession
.builder().master("local[*]")
.appName("Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val sc = spark.sparkContext
val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes => Row(attributes._1, attributes._2))
val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3", "home"))).map(attributes => Row(attributes._1, attributes._2))
val schemaString = "key name"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val dataA = spark.createDataFrame(rddA, schema)
dataA.createOrReplaceTempView("dataA")
val dataB = spark.createDataFrame(rddB, schema)
dataB.createOrReplaceTempView("dataB")
dataA.show()
dataB.show()
val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
spark.stop()
}
}
//结果
+---+-------+----------+
|key| name| name2|
+---+-------+----------+
| 3| good| home|
| 1| www|songshifan|
| 1|iteblog|songshifan|
| 1| com|songshifan|
| 2| bbs| haiyang|
| 2|iteblog| haiyang|
| 2| com| haiyang|
+---+-------+----------+
-
注意:在使用spark-session时,总是会报SparkSession类找不到的错误,这是因为我们的代码是运行在本地环境中,maven在打包的时候没有把Spark-session相关的内容打到我们的package中,这一点可以将编译好的jar包解压到相应的目录下找找看。
-
解决办法:在编辑器运行时,强制指定依赖的jar包。
-
疑问:之前测试过1.4版本的,写好的代码不把依赖jar包打入我们的jar包中,提交集群时会报错,所以1把所有依赖包都打入jar包中,2 在执行时用--jars参数去提交机器上找jar包。现在有一种说法是运行环境已经把依赖包都放在创建的执行器中,不必再加入依赖jar包。这个需要继续研究、测试。
网友评论