RDD编程
什么是RDD
RDD是Spark的基石,是实现Spark数据处理的核心抽象。RDD是一个抽象类,它代表一个不可变、可分区、里面元素可并行计算的集合。
RDD(Resilient Distributed Dataset)是Spark中的核心概念,它是一个容错、可以并行执行的分布式数据集
RDD包含五个特征
- 一个分区的列表
- 一个计算函数:compute,作用是对每个分区进行计算
- 记录对其他RDDs的依赖(宽依赖、窄依赖)列表
- 对于Key-value RDD来说,存在一个分区器(可选的)
- 对每个分区有一个优先位置的列表(可选的)
- 一组分片,即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD分片个数,如果没有指定,那么就会采用默认值。
- 一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到该目的。compute函数会对迭代器进行组合,不需要保存每次的计算结果。
- RDD之间存在依赖关系,RDD每次转换都会生成一个新的RDD,RDD之间形成类似于流水线一样的前后依赖关系(lineage)。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对所有的分区进行计算。
- 对于Key-Value的RDD来说,可能存在分区器(Partitioner)。Spark实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有key-value的RDD,才可能有Partitioner,非Key-Value的RDD的Partitioner的值是nono。Partitioner函数决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
- 一个列表,存储每个Parition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置,安装“移动计算不移动数据”的理念,会尽可能的将计算任务分配到其所要处理数据块的存储位置。
RDD特点
分区
RDD逻辑上时分区的,每个分区的数据时抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据,如果RDD是通过已有的文件系统构建,则Compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,而Compute函数是执行转换逻辑将其他RDD的数据进行转换。
分区.png
只读
RDD是只读的,想要改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。一个RDD转换为另一个RDD。通过丰富的操作算子(map,filter,union,join,reduceByKey等)实现,不像MR只能写Map和Reduce
只读.pngRDD的操作算子包含两类:
- transformation 用来对RDD进行转化,延迟执行(lazy)
- action 用来出发RDD计算,得到的计算结果或者将RDD保存到文件系统中
依赖
RDDs通过算子进行转换,转换得到的新的RDD包含了从其他RDDs衍生的所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种
- 窄依赖。RDDs之间分区是一一对应的(1:1 或 n:1)
- 宽依赖。子RDD的每个分区与父RDD的每个分区都有关,是多对多的关系(n:m)有Shuffle发生
缓存
可以控制存储级别(内存、磁盘等)进行缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到的分区数据,在后续其他地方用到该RDD的时候,会直接冲缓存处读取而不是在根据血缘关系计算,这样就加速的重用
缓存.pngcheckpoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续的迭代过程中出错,则需要通过非常长的血缘关系重建,势必影响性能。RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD就不需要知道他的父RDDs,他可以直接从checkpoint处拿到数据。
Spark编程模型
Spark编程模型.png- RDD表示数据对象
- 通过对象上的方法调用来对RDD进行转换
- 最终显示结果或将结果输出到外部数据源
- RDD转换算子称为Transformation是Lazy的(延迟执行)
- 需要遇到Action算子才会执行RDD操作
需要使用Spark,需要编写Driver程序,它被提交到集群运行
- Driver中定义了一个或者多个RDD,并调用RDD上的各种算子
- Worker则执行RDD分区计算任务
RDD创建
SparkContext
SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点。他负责整个集群的交互。如果把Spark集群当作服务端,那么Driver就是客户端,SparkContext是客户端的核心,SparkContext是Spark对外接口,负责向调用者提供Spark的各种功能。SparkContext用于连接Spark集群、创建RDD、累加器、广播变量。在Spark-shell中SparkContext已经创建好了,可直接使用,编写Spark Driver程序的第一件事就是创建SparkContext
RDD创建.png从集合创建RDD
从集合中创建RDD,主要用户测试,Spark提供一下函数:parallelize、makeRDD、range
/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/**
* Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
* `step` every element.
*
* @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
*
* @param start the start value.
* @param end the end value.
* @param step the incremental step
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed range
*/
def range(
start: Long,
end: Long,
step: Long = 1,
numSlices: Int = defaultParallelism): RDD[Long] = withScope {
assertNotStopped()
// when step is 0, range will run infinitely
require(step != 0, "step cannot be 0")
val numElements: BigInt = {
val safeStart = BigInt(start)
val safeEnd = BigInt(end)
if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
(safeEnd - safeStart) / step
} else {
// the remainder has the same sign with range, could add 1 more
(safeEnd - safeStart) / step + 1
}
}
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
if (bi.isValidLong) {
bi.toLong
} else if (bi > 0) {
Long.MaxValue
} else {
Long.MinValue
}
val safePartitionStart = getSafeMargin(partitionStart)
val safePartitionEnd = getSafeMargin(partitionEnd)
new Iterator[Long] {
private[this] var number: Long = safePartitionStart
private[this] var overflow: Boolean = false
override def hasNext =
if (!overflow) {
if (step > 0) {
number < safePartitionEnd
} else {
number > safePartitionEnd
}
} else false
override def next() = {
val ret = number
number += step
if (number < ret ^ step < 0) {
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
// back, we are pretty sure that we have an overflow.
overflow = true
}
ret
}
}
}
}
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(1 until 100)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd2.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
scala> rdd1.getNumPartitions
res2: Int = 5
scala> rdd1.partition
partitioner partitions
scala> rdd1.partitions.length
res3: Int = 5
scala> val rdd3 = sc.range(1,100,2)
rdd3: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24
scala> rdd3.collect
res4: Array[Long] = Array(1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99)
scala> val rdd4 = sc.parallelize(1 to 100,3)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd3.getNumPartitions
res5: Int = 5
scala> val rdd3 = sc.range(1,100,2,1)
rdd3: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[6] at range at <console>:24
scala> rdd3.getNumPartitions
res6: Int = 1
rdd.collect方法是聚合的意思,在生产中不要使用,会造成Driver OOM
从文件系统创建RDD
用textFile()方法来文件系统中加载数据创建RDD,方法将文件的URI作为参数,这个URI可以是:
- 本地文件系统
- 如果是本地文件系统,在Spark集群中,要注意是否每个服务器上同个目录下都有该文件
- 分布式文件系统HDFS的地址
- Amazon S3的地址
// 从本地文件系统加载数据
val lines = sc.textFile("file:///root/data/wc.txt")
// 从分布式文件系统加载数据
val lines = sc.textFile("hdfs://linux121:9000/user/root/data/uaction.dat") val lines = sc.textFile("/user/root/data/uaction.dat")
val lines = sc.textFile("data/uaction.dat")
从RDD创建RDD
本质是将一个RDD转换为另一个RDD。详细信息参见 3.5 Transformation
Transformation
RDD的操作算子分为两类:
- Transformation,用来对RDD进行转化,这个操作是延时执行或者说是Lazy的,返回的是一个新的RDD。
- Action,用来触发RDD的计算,得到相关计算结果 或者 将结果保存的外部系统中,返回结果int、double、集合(不会返回新的RDD)
- 要很准确区分Transformation、Action
每一次Tranformation操作都会产生新的RDD,供给下一个“转化”使用,转化得到的RDD是惰性求值得,也就是说,整个转换过程只是记录了转换的轨迹,并不会真正的计算,只有遇到了Action操作时,才会发生真正的计算,开始从血缘关系(lineage)源头开始,进行物理的转换操作;
Transformation.png创建的Transformation:官网
常用转换算子1
-
map(func):对数据集中的每个元素都进行func操作,然后返回一个新的RDD。
-
filter(func):对数据集中的每个元素都进行func操作,然后执行func操作,执行为true的返回构成一个新的RDD
-
flatMap(func) 与map类似,和Scala中的flatMap一致,每个输入元素会被映射为0个或多个输出元素
-
mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分区上。假设一个RDD有N个元素,M个分区,(N >> M),那么map的函数将会将会被调用N次,而mapParitions会被调用M次一次处理一个分区的数据
-
mapPartitionsWithIndex(func):与mapPartitions类似,多了分区的索引值的信息。
全都是窄依赖
scala> val rdd1 = sc.parallelize(1 to 100)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
scala> rdd1.map(_ * 2).collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200)
scala> rdd1.filter(_ > 50).collect
res2: Array[Int] = Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
scala> val rdd2 = sc.textFile("/azkaban-wc/wc.txt")
rdd2: org.apache.spark.rdd.RDD[String] = /azkaban-wc/wc.txt MapPartitionsRDD[4] at textFile at <console>:24
scala> rdd2.flatMap(_.split(" ")).collect
res4: Array[String] = Array(hadoop, mapreduce, yarn, hdfs, hadoop, mapreduce, mapreduce, yarn, lagou, lagou, lagou)
## mapPartitions
scala> val rdd1 = sc.makeRDD(1 to 20,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> rdd1.getNumPartitions
res0: Int = 3
scala> rdd1.mapPartitions(as => Iterator(as.toArray.mkString(":"))).collect
res5: Array[String] = Array(1:2:3:4:5:6, 7:8:9:10:11:12:13, 14:15:16:17:18:19:20)
scala> rdd1.mapPartitions(iter => Iterator(iter.toList)).collect
res8: Array[List[Int]] = Array(List(1, 2, 3, 4, 5, 6), List(7, 8, 9, 10, 11, 12, 13), List(14, 15, 16, 17, 18, 19, 20))
scala> rdd1.mapPartitions(iter => Iterator(iter.toArray.toBuffer)).collect
res9: Array[scala.collection.mutable.Buffer[Int]] = Array(ArrayBuffer(1, 2, 3, 4, 5, 6), ArrayBuffer(7, 8, 9, 10, 11, 12, 13), ArrayBuffer(14, 15, 16, 17, 18, 19, 20))
scala> rdd1.mapPartitionsWithIndex((i,iter) => Iterator(i + "|" + iter.toArray.toBuffer)).collect
res13: Array[String] = Array(0|ArrayBuffer(1, 2, 3, 4, 5, 6), 1|ArrayBuffer(7, 8, 9, 10, 11, 12, 13), 2|ArrayBuffer(14, 15, 16, 17, 18, 19, 20))
/**
* 第一个参数参数是一个方法,讲一个迭代器迭代成另一个迭代器
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
map 与 mapPartitions的区别
- map:每次处理一条数据
- mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足容易OOM
- 最佳时间,当内存资源充足时,建议使用mapPartitions,以提高处理效率
常用转换算子2
-
groupBy(func):按照传入函数的返回值进行分组,将key相同的值放入一个迭代器
-
glom():将每一个分区形成一个数组,形成新的RDD[Array[T]]
-
sample(withReplaceMent,fraction,seed):采样算子,以指定的随机种子(seed)随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
-
distinct([num]):对RDD元素去重后,返回一个新的RDD,可传入numTasks才参数改变RDD分区数
-
coalesce(numPartitions):缩减分区数,无shuffle
-
repartitions(numPartitions):增加或缩减分区数,有shuffle
-
sortBy(func,[ascending],[numTasks]):使用func对数据进行处理,对处理后的结果进行排序
宽依赖算子:groupBy、distinct、repartition、sortBy
scala> val rdd1 = sc.makeRDD(1 to 10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24
scala> rdd1
res21: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24
scala> rdd1.collect
res22: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd1.mapPartitions(iter => iter.map(_ * 10))
res24: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at mapPartitions at <console>:26
scala> res24.collect
res25: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
scala> val rdd2 = rdd1.groupBy(_ % 3)
rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[18] at groupBy at <console>:25
scala> rdd2.collect
res27: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 1, 4, 10)), (2,CompactBuffer(2, 5, 8)))
scala> rdd2.mapValues(_.map(_*2)).collect
res28: Array[(Int, Iterable[Int])] = Array((0,List(6, 12, 18)), (1,List(2, 20, 8, 14)), (2,List(10, 4, 16)))
scala> val rdd1 = sc.makeRDD(1 to 20,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd1.glom
res0: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[1] at glom at <console>:26
scala> res0.collect
res1: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6), Array(7, 8, 9, 10, 11, 12, 13), Array(14, 15, 16, 17, 18, 19, 20))
scala> rdd1.collect
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> val rdd1 = sc.makeRDD(1 to 111)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
scala> rdd1.glom.collect
res3: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22), Array(23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44), Array(45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), Array(89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111))
scala> rdd1.glom.map(_.map(_ * 10)).collect
res4: Array[Array[Int]] = Array(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220), Array(230, 240, 250, 260, 270, 280, 290, 300, 310, 320, 330, 340, 350, 360, 370, 380, 390, 400, 410, 420, 430, 440), Array(450, 460, 470, 480, 490, 500, 510, 520, 530, 540, 550, 560, 570, 580, 590, 600, 610, 620, 630, 640, 650, 660), Array(670, 680, 690, 700, 710, 720, 730, 740, 750, 760, 770, 780, 790, 800, 810, 820, 830, 840, 850, 860, 870, 880), Array(890, 900, 910, 920, 930, 940, 950, 960, 970, 980, 990, 1000, 1010, 1020, 1030, 1040, 1050, 1060, 1070, 1080, 1090, 1100, 1110))
scala> val rdd1 = sc.makeRDD(1 to 111,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:24
scala> rdd1.glom
res5: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[7] at glom at <console>:26
scala> rdd1.glom.collect
res6: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55), Array(56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111))
scala> rdd1.glom.getNumPartitions
res7: Int = 2
scala> val rdd3 = rdd1.glom.map(_.sliding(10,10))
rdd3: org.apache.spark.rdd.RDD[Iterator[Array[Int]]] = MapPartitionsRDD[11] at map at <console>:25
scala> rdd3.collect
res8: Array[Iterator[Array[Int]]] = Array(<iterator>, <iterator>)
# 将数组聚合到一起,然后对聚合到以前的数据进行遍历,下面的_ 表示的是Array[Int],然后对Array[Int]进行分割,步十个元素一组,步长为10
scala> val rdd3 = rdd1.glom.map(_.sliding(10,10).toArray)
rdd3: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[13] at map at <console>:25
scala> rdd3.collect
res10: Array[Array[Array[Int]]] = Array(Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55)), Array(Array(56, 57, 58, 59, 60, 61, 62, 63, 64, 65), Array(66, 67, 68, 69, 70, 71, 72, 73, 74, 75), Array(76, 77, 78, 79, 80, 81, 82, 83, 84, 85), Array(86, 87, 88, 89, 90, 91, 92, 93, 94, 95), Array(96, 97, 98, 99, 100, 101, 102, 103, 104, 105), Array(106, 107, 108, 109, 110, 111)))
scala> rdd3.getNumPartitions
res11: Int = 2
## 生成一个1 到 33的数组
scala> val rddx = (1 to 33).toArray
rddx: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33)
## 将数组进行分割,十个元素为一组,默认的步长为1,即第一个数组为1~10,第二个数组为2~11,1和2的步长为1
scala> rddx.sliding(10).toArray
res14: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), Array(3, 4, 5, 6, 7, 8, 9, 10, 11, 12), Array(4, 5, 6, 7, 8, 9, 10, 11, 12, 13), Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14), Array(6, 7, 8, 9, 10, 11, 12, 13, 14, 15), Array(7, 8, 9, 10, 11, 12, 13, 14, 15, 16), Array(8, 9, 10, 11, 12, 13, 14, 15, 16, 17), Array(9, 10, 11, 12, 13, 14, 15, 16, 17, 18), Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(12, 13, 14, 15, 16, 17, 18, 19, 20, 21), Array(13, 14, 15, 16, 17, 18, 19, 20, 21, 22), Array(14, 15, 16, 17, 18, 19, 20, 21, 22, 23), Array(15, 16, 17, 18, 19, 20, 21, 22, 23, 24), Array(16, 17, 18, 19, 20, 21, 22, 23, 24, 25), Array(17, 18, 19, 20, 21, 22, 2...
## 将数组进行分割,十个元素为一组,默认的步长为5
scala> rddx.sliding(10,5).toArray
res15: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(6, 7, 8, 9, 10, 11, 12, 13, 14, 15), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(16, 17, 18, 19, 20, 21, 22, 23, 24, 25), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(26, 27, 28, 29, 30, 31, 32, 33))
## 将数组进行分割,十个元素为一组,默认的步长为10
scala> rddx.sliding(10,10).toArray
res16: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33))
# 对数据采样,先生成一个数组
scala> val rdd1 = sc.makeRDD(1 to 200)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
# 对数组进行采样,第一个参数为是否允许采样过的数据,再次被收集到,true为可以,第二个参数是取多少比例的数据,该数据是一个范围,不是准备的就是那么多,最后一个参数为算子,当算子相同的时候,每次采样的数据必定是相同的,该参数可以不写,则每次采样的数据就是完全随机的了。
scala> val rdd2 = rdd1.sample(true,0.1,10)
rdd2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[2] at sample at <console>:25
scala> rdd2.collect
res1: Array[Int] = Array(2, 4, 20, 20, 37, 60, 67, 81, 96, 106, 107, 156, 164, 166, 167, 172, 173, 174, 174, 185, 187, 188, 195, 197)
scala> rdd1.sample(true,0.1,10).collect
res2: Array[Int] = Array(2, 4, 20, 20, 37, 60, 67, 81, 96, 106, 107, 156, 164, 166, 167, 172, 173, 174, 174, 185, 187, 188, 195, 197)
scala> rdd1.sample(true,0.1,101).collect
res3: Array[Int] = Array(28, 32, 45, 53, 60, 94, 102, 120, 157, 162, 167, 170, 170, 183, 185, 200)
scala> rdd1.sample(true,0.1).collect
res4: Array[Int] = Array(13, 14, 36, 51, 55, 81, 83, 84, 88, 106, 106, 120, 127, 142, 145, 149, 158, 176, 188, 190)
scala> rdd1.sample(true,0.1).collect
res5: Array[Int] = Array(1, 12, 35, 39, 45, 57, 63, 80, 99, 107, 113, 117, 145, 160, 160, 161, 162, 170, 175, 185, 185, 186, 200)
scala> rdd1.sample(false,0.1,10).collect
res6: Array[Int] = Array(2, 5, 8, 10, 15, 16, 31, 34, 35, 37, 59, 71, 75, 81, 90, 108, 154, 164, 169, 179, 181, 183)
scala> rdd1.sample(false,0.1,10).collect
res7: Array[Int] = Array(2, 5, 8, 10, 15, 16, 31, 34, 35, 37, 59, 71, 75, 81, 90, 108, 154, 164, 169, 179, 181, 183)
scala> rdd1.sample(false,0.1).collect
res8: Array[Int] = Array(31, 40, 44, 67, 70, 107, 108, 112, 117, 125, 154, 163, 170, 174, 185, 187, 199)
## 测试去重,先生成一个random,然后生成随机数
scala> val random = scala.util.Random
random: util.Random.type = scala.util.Random$@6022cf7e
scala> val arr = (1 to 30).map(_ => random.nextInt(15))
arr: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 6, 7, 4, 5, 1, 4, 1, 8, 13, 14, 1, 0, 4, 1, 13, 7, 4, 12, 14, 12, 7, 4, 10, 4, 13, 5, 5, 2, 14)
scala> val rdd = sc.makeRDD(arr)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:26
scala> rdd.collect
res0: Array[Int] = Array(1, 6, 7, 4, 5, 1, 4, 1, 8, 13, 14, 1, 0, 4, 1, 13, 7, 4, 12, 14, 12, 7, 4, 10, 4, 13, 5, 5, 2, 14)
scala> rdd.distinct.collect
res1: Array[Int] = Array(0, 10, 5, 1, 6, 7, 12, 2, 13, 8, 4, 14)
## 减少或增加分区数
# coalesce只能减少分区数,不能增加分区数
scala> val rdd2 = rdd1.distinct.coalesce(2)
rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[13] at coalesce at <console>:25
scala> rdd2.getNumPartitions
res4: Int = 2
scala> val rdd3 = rdd1.distinct.repartition(2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at repartition at <console>:25
scala> rdd3.getNumPartitions
res5: Int = 2
scala> val rdd3 = rdd1.distinct.repartition(6)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at <console>:25
scala> rdd3.getNumPartitions
res6: Int = 6
## 排序,默认升序,false为降序
scala> rdd1.sortBy(x => x).collect
res11: Array[Int] = Array(0, 1, 1, 1, 1, 2, 4, 4, 5, 5, 5, 6, 7, 7, 7, 7, 8, 8, 10, 10, 10, 10, 11, 11, 11, 13, 13, 13, 14, 14)
scala> rdd1.sortBy(x => x,false).collect
res12: Array[Int] = Array(14, 14, 13, 13, 13, 11, 11, 11, 10, 10, 10, 10, 8, 8, 7, 7, 7, 7, 6, 5, 5, 5, 4, 4, 2, 1, 1, 1, 1, 0)
coalesce 与 repartition的区别
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*
* TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
//调用coalesce,但是有shuffle,明确指定shuffle为true
coalesce(numPartitions, shuffle = true)
}
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
小结:
- repartition: 增大或减少分区数;有shuffle
- coalesce:一般用于减少分区数(此时无shuffle)
常见转换算子3
RDD之间的交、并、差算子:分别如下
- intersection(otherRDD) 交集
- union(otherRDD) 并集
- subtract(otherRDD) 差集,rdd1.subtract(rdd2),这里面指的是rdd1相比rdd2的差集
- cartesian(otherRDD):笛卡尔积
- zip(otherRDD):将两个RDD组合成key-value形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
宽依赖的算子(shuffle):intersection、subtract、cartesian
scala> val rdd1 = sc.range(1,21)
rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24
scala> val rdd2 = sc.range(10,31)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24
scala> rdd1.intersection(rdd2).collect
res0: Array[Long] = Array(15, 20, 10, 16, 11, 17, 12, 13, 18, 19, 14)
scala> rdd1.intersection(rdd2).collect.sorted
res1: Array[Long] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> rdd1.intersection(rdd2).sortBy(x => x).collect
res3: Array[Long] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> rdd1.intersection(rdd2).sortBy(x => x).collect
res3: Array[Long] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> rdd1.union(rdd2).sortBy(x => x).collect
res4: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 16, 16, 17, 17, 18, 18, 19, 19, 20, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
scala> rdd1.union(rdd2).distinct.sortBy(x => x).collect
res5: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
scala> rdd1.subtract(rdd2).sortBy(x => x).collect
res6: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> rdd1.intersection(rdd2).getNumPartitions
res7: Int = 5
scala> rdd1.union(rdd2).getNumPartitions
res8: Int = 10
scala> rdd1.subtract(rdd2).getNumPartitions
res9: Int = 5
scala> val rdd1 = sc.range(1,5)
rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[63] at range at <console>:24
scala> val rdd2 = sc.range(6,10)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[65] at range at <console>:24
scala> rdd1.cartesian(rdd2).collect
res10: Array[(Long, Long)] = Array((1,6), (1,7), (1,8), (1,9), (2,6), (2,7), (2,8), (2,9), (3,6), (3,7), (3,8), (3,9), (4,6), (4,7), (4,8), (4,9))
scala> rdd1.cartesian(rdd2).getNumPartitions
res11: Int = 25
scala> rdd1.zip(rdd2).collect
res12: Array[(Long, Long)] = Array((1,6), (2,7), (3,8), (4,9))
scala> rdd1.zip(rdd2).getNumPartitions
res13: Int = 5
scala> val rdd2 = sc.range(1,12)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[71] at range at <console>:24
scala> rdd1.zip(rdd2)
res14: org.apache.spark.rdd.RDD[(Long, Long)] = ZippedPartitionsRDD2[72] at zip at <console>:28
scala> rdd1.zip(rdd2).collect
20/10/23 14:17:58 WARN TaskSetManager: Lost task 2.0 in stage 30.0 (TID 182, 172.17.178.97, executor 0): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
at org.apache.spark.rdd.RDD$$anon$2.hasNext(RDD.scala:914)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.rdd.RDD$$anon$2.foreach(RDD.scala:910)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.rdd.RDD$$anon$2.to(RDD.scala:910)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.rdd.RDD$$anon$2.toBuffer(RDD.scala:910)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.rdd.RDD$$anon$2.toArray(RDD.scala:910)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:990)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
备注:
- union是窄依赖:得到的RDD分区数为两个RDD分区数之和
- cartesian是宽依赖:得到的RDD分区数为两个分区数之积。慎用
Action
Action是用来触发RDD的计算,得到相关计算结果;
Action触发job。一个Spark程序(Driver程序)包含了多少Action算子,那么就有多少个Job
典型的Action算子:collect(将数据聚合到一起)、count(统计RDD中数据的数量)
每个action的调用链路,例如:collect()
collect() => sc.runJob() => ... => dagScheduler.runJob() => 触发了Job
要求:能快速的区分Transformation、Action
源码:
/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
.....
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @note Throws `Exception` when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
collect/collectAsMap()
stats/count/mean/stdev/max/min
reduce(func)/fold(func)/aggregate(func)
![fold And Aggregate](图片/fold And Aggregate.png)
first():取RDD的第一个元素
take(n):取RDD的前n个元素
top(n):按照默认或者指定排序规则,返回前n个元素
takeSample(withReplacement, num, [seed]):返回采样的数据
foreach(func) / foreachPartition(func):与map、mapPartitions类似,区别是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
# stats:返回统计信息的,只能作用于RDD[Double]类型上调用
scala> val rdd1 = sc.range(1,101)
rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24
scala> rdd1.stats
res0: org.apache.spark.util.StatCounter = (count: 100, mean: 50.500000, stdev: 28.866070, max: 100.000000, min: 1.000000)
scala> val rdd2 =sc.range(1,101)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at range at <console>:24
# 不能调用
scala> rdd1.zip(rdd2).stats
<console>:28: error: value stats is not a member of org.apache.spark.rdd.RDD[(Long, Long)]
rdd1.zip(rdd2).stats
^
# count在各种类型的RDD上,均能调用
scala> rdd1.zip(rdd2).count
res2: Long = 100
# 获取第一个元素
scala> rdd1.zip(rdd2).first
res3: (Long, Long) = (1,1)
# 获取前十个元素
scala> rdd1.zip(rdd2).take(10)
res4: Array[(Long, Long)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10))
# 对rdd1中的元素求和
scala> rdd1.reduce(_ +_)
res5: Long = 5050
scala> rdd1.fold(0)(_+_)
res6: Long = 5050
scala> rdd1.fold(1)(_+_)
res7: Long = 5053
scala> rdd1.getNumPartitions
res8: Int = 2
scala> val rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd.getNumPartitions
res0: Int = 2
scala> rdd.reduce(_ + _)
res1: Int = 55
scala> rdd.reduce((x,y) =>{
| println(s"x:$x,y:$y")
| x+y
| })
x:6,y:7
x:13,y:8
x:21,y:9
x:30,y:10
x:1,y:2
x:3,y:3
x:6,y:4
x:10,y:5
x:40,y:15
res2: Int = 55
scala> rdd.fold(1)(_+_)
res3: Int = 58
#fold(初始值)(局部汇总和全局汇总要用到的函数)
scala> rdd.fold(1)((x,y) => {
| println(s"x:$x,y:$y")
| x + y
| })
x:1,y:1
x:2,y:2
x:4,y:3
x:7,y:4
x:11,y:5
x:1,y:6
x:7,y:7
x:14,y:8
x:22,y:9
x:31,y:10
x:1,y:16
x:17,y:41
res4: Int = 58
# aggregate(初始值)((局部汇总的函数),(全局汇总的函数))
scala> rdd.aggregate(1)(_+_,_+_)
res5: Int = 58
scala> rdd.aggregate(1)((x,y) =>{
| println(s"x:$x,y:$y")
| x+y
| }
| ,
| (a,b) => {
| println(s"a:$a,b:$b")
| a+b
| })
x:1,y:1
x:2,y:2
x:4,y:3
x:7,y:4
x:11,y:5
a:1,b:16
x:1,y:6
x:7,y:7
x:14,y:8
x:22,y:9
x:31,y:10
a:17,b:41
res6: Int = 58
Action.png
first、take(n)、top(n)
scala> val rdd = sc.range(1,101)
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24
#取出第一个元素
scala> rdd.first
res0: Long = 1
# 取出前十个元素
scala> rdd.take(10)
res1: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
#倒序后,取出前20个元素
scala> rdd.top(20)
res3: Array[Long] = Array(100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81)
scala> val random = scala.util.Random
random: util.Random.type = scala.util.Random$@34f9275f
scala> var rdd1 = (1 to 100).map(x => random.nextInt(200))
rdd1: scala.collection.immutable.IndexedSeq[Int] = Vector(100, 97, 145, 12, 156, 82, 70, 67, 84, 152, 39, 10, 164, 138, 174, 176, 182, 111, 171, 129, 191, 145, 190, 114, 177, 193, 139, 163, 134, 188, 193, 14, 91, 66, 143, 91, 6, 39, 51, 86, 62, 179, 121, 16, 94, 76, 116, 193, 85, 155, 108, 176, 69, 92, 20, 121, 173, 94, 150, 93, 86, 59, 62, 40, 30, 162, 64, 43, 156, 176, 6, 41, 154, 53, 186, 47, 88, 10, 21, 181, 74, 112, 153, 1, 56, 11, 17, 42, 94, 182, 171, 194, 23, 16, 167, 165, 123, 133, 71, 198)
scala> val rdd = sc.makeRDD(rdd1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:26
scala> rdd.take(10)
res5: Array[Int] = Array(100, 97, 145, 12, 156, 82, 70, 67, 84, 152)
scala> rdd.top(10)
res6: Array[Int] = Array(198, 194, 193, 193, 193, 191, 190, 188, 186, 182)
#采样,第一个参数为是否返回,第二个位获取几个数据,第三个位随机算子
scala> rdd.takeSample(true,10)
res7: Array[Int] = Array(84, 143, 56, 129, 163, 11, 190, 176, 177, 12)
scala> rdd.takeSample(true,40)
res8: Array[Int] = Array(134, 167, 86, 94, 12, 177, 69, 139, 53, 30, 163, 181, 177, 94, 176, 134, 150, 30, 16, 133, 193, 121, 67, 163, 76, 182, 145, 16, 93, 91, 10, 123, 171, 163, 39, 111, 181, 10, 94, 150)
scala> rdd.takeSample(false,40)
res9: Array[Int] = Array(134, 121, 86, 108, 198, 111, 171, 6, 30, 155, 121, 116, 171, 176, 153, 145, 114, 1, 43, 74, 66, 56, 41, 177, 156, 47, 191, 88, 53, 179, 64, 176, 138, 129, 84, 194, 123, 70, 163, 93)
foreach(func)
scala> rdd.foreach(_+1)
scala> rdd.collect
res11: Array[Int] = Array(100, 97, 145, 12, 156, 82, 70, 67, 84, 152, 39, 10, 164, 138, 174, 176, 182, 111, 171, 129, 191, 145, 190, 114, 177, 193, 139, 163, 134, 188, 193, 14, 91, 66, 143, 91, 6, 39, 51, 86, 62, 179, 121, 16, 94, 76, 116, 193, 85, 155, 108, 176, 69, 92, 20, 121, 173, 94, 150, 93, 86, 59, 62, 40, 30, 162, 64, 43, 156, 176, 6, 41, 154, 53, 186, 47, 88, 10, 21, 181, 74, 112, 153, 1, 56, 11, 17, 42, 94, 182, 171, 194, 23, 16, 167, 165, 123, 133, 71, 198)
scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
scala> rdd.getNumPartitions
res12: Int = 2
scala> rdd.foreach(x => {
| println(x+1)
| }
| )
2
3
4
5
6
7
8
9
10
11
foreachPartition(func)
scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd.foreachPartition
def foreachPartition(f: Iterator[Int] => Unit): Unit
scala>rdd.foreachPartition(iter => iter.foreach(println(_)))
[Stage 0:> (0 + 2) / 2]
1
2
3
4
5
6
7
8
9
10
saveAsTextFile
scala> rdd.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd.getNumPartitions
res2: Int = 2
#存放到HDFS中,一个分区就存一个文件,容易出现小文件问题(t1目录下就是两个文件)
scala> rdd.saveAsTextFile("/spark-test/t1")
#存放到HDFS中,一个分区就存一个文件,容易出现小文件问题(t1目录下就是一个文件)
scala> rdd.coalesce(1).saveAsTextFile("/spark-test/t2")
Key-Value RDD操作
RDD整体上分为Value类型和Key-Value类型,前面介绍的是Value类型RDD操作,实际使用更多的是key-value类型的RDD,也被称为pariRDD。value类型RDD的操作基本集中在RDD.scala中,key-value类型的RDD操作集中在PairRDDFunctions.scala中
Key-ValueRDD操作1.png前面介绍的大多数算子对Pair RDD都是有效的,Pair RDD还有属于自己的Transformation、Action算子
创建PairRDD
scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val arr = (1 to 10).toArray
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val arr1 = arr.map(x =>(x,x * 10,x* 100))
arr1: Array[(Int, Int, Int)] = Array((1,10,100), (2,20,200), (3,30,300), (4,40,400), (5,50,500), (6,60,600), (7,70,700), (8,80,800), (9,90,900), (10,100,1000))
# rdd不是 Pari RDD
scala> val rdd = sc.makeRDD(arr1)
rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:26
scala> rdd.first
res1: (Int, Int, Int) = (1,10,100)
# res2 是Pari RDD
scala> rdd.map(x => (x._1,(x._2,x._3))).take(3)
res2: Array[(Int, (Int, Int))] = Array((1,(10,100)), (2,(20,200)), (3,(30,300)))
scala> rdd.map(x => (x._1,(x._2,x._3))).collectAsMap
res4: scala.collection.Map[Int,(Int, Int)] = Map(8 -> (80,800), 2 -> (20,200), 5 -> (50,500), 4 -> (40,400), 7 -> (70,700), 10 -> (100,1000), 1 -> (10,100), 9 -> (90,900), 3 -> (30,300), 6 -> (60,600))
Transformation操作
类似Map操作
mapValues/flatMapValues/keys/values,这些操作都可以使用map操作实现,是简化操作。
mapValues:直接操作map的values
flatMapValues:操作完values值后,在将值拉平
scala> val a = sc.parallelize(List((1,2),(3,4),(5,6)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> a.collect
res5: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
scala> a.collectAsMap
res6: scala.collection.Map[Int,Int] = Map(5 -> 6, 1 -> 2, 3 -> 4)
scala> rdd.map(x => (x._1,(x._2,x._3))).collect
res8: Array[(Int, (Int, Int))] = Array((1,(10,100)), (2,(20,200)), (3,(30,300)), (4,(40,400)), (5,(50,500)), (6,(60,600)), (7,(70,700)), (8,(80,800)), (9,(90,900)), (10,(100,1000)))
scala> val b = a.mapValues(x => ( 1 to x))
b: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[6] at mapValues at <console>:25
scala> b.collect
res9: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
scala> val c = a.map(x => (x._1,1 to x._2))
c: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[7] at map at <console>:25
scala> c.collect
res10: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
scala> val d = a.map{case(k,v) => (k,1 to v)}
d: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[8] at map at <console>:25
scala> d.collect
res11: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
scala> val e = a.flatMapValues(x => 1 to x)
e: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[9] at flatMapValues at <console>:25
scala> e.collect
res12: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))
scala> val f = a.map(x => (x._1,1 to x._2)).collect
f: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
scala> val f = a.map(x => (x._1,1 to x._2)).flatMap{case (k ,v) => v.map(elem => (k,elem))}
f: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[12] at flatMap at <console>:25
scala> f.collect
res13: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))
scala> f.keys.collect
res14: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
scala> f.values.collect
res15: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)
scala> f.map{case(k,y) => k}
res16: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at <console>:26
scala> f.map{case(k,y) => k}.collect
res17: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
scala> f.map{case(k,v) => v}.collect
res19: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)
scala> f.map(x=> x._1).collect
res20: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
scala> f.map{case(k,_) => k}
res21: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:26
scala> f.map{case(k,_) => k}.collect
res23: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
聚合操作
PariRDD(k,v)使用范围广,聚合
groupByKey/reduceByKey/foldByKey/aggregateByKey
combineByKey(OLD)/combineByKeyWithClassTag(NEW) => 底层的实现
subtractByKey:类似subtract,删掉RDD中健与other RDD中的健相同的元素
小案例:给定一组数据:("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16), 键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均 值,也就是计算每种图书的每天平均销量。
scala> val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
# groupByKey
scala> rdd.groupByKey
res0: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[1] at groupByKey at <console>:26
scala> rdd.groupByKey.collect
res1: Array[(String, Iterable[Int])] = Array((scala,CompactBuffer(26, 24)), (spark,CompactBuffer(12, 15, 25, 23, 16)), (hadoop,CompactBuffer(26, 23, 16)))
scala> rdd.groupByKey.mapValues(x => (x.toArray.sum/x.size))
res5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at mapValues at <console>:26
scala> rdd.groupByKey.mapValues(x => (x.toArray.sum/x.size)).collect
res6: Array[(String, Int)] = Array((scala,25), (spark,18), (hadoop,21))
scala> rdd.groupByKey.map(x =>(x._1,x._2.sum) )
res7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:26
scala> rdd.groupByKey.map(x =>(x._1,x._2.sum / x._2.size) )
res8: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:26
scala> rdd.groupByKey.map(x =>(x._1,x._2.sum / x._2.size) ).collect
res9: Array[(String, Int)] = Array((scala,25), (spark,18), (hadoop,21))
scala> rdd.groupByKey.map(x =>(x._1,x._2.sum.toDouble / x._2.size) ).collect
res10: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
scala> rdd.groupByKey.map{case (k,v) => (k,v.sum * 1.0 / v.size)}
res11: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[18] at map at <console>:26
scala> rdd.groupByKey.map{case (k,v) => (k,v.sum * 1.0 / v.size)}.collect
res12: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
scala> rdd.groupByKey.mapValues(v => v.sum * 1.0 / v.size)
res13: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[22] at mapValues at <console>:26
scala> rdd.groupByKey.mapValues(v => v.sum * 1.0 / v.size).collect
res14: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
scala> rdd.reduceByKey(_+_).collect
res16: Array[(String, Int)] = Array((scala,50), (spark,91), (hadoop,65))
scala> rdd.mapValues(x => (x,1)).collect
res0: Array[(String, (Int, Int))] = Array((spark,(12,1)), (hadoop,(26,1)), (hadoop,(23,1)), (spark,(15,1)), (scala,(26,1)), (spark,(25,1)), (spark,(23,1)), (hadoop,(16,1)), (scala,(24,1)), (spark,(16,1)))
# reduceByKey
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._1 + x._1))
res1: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[3] at reduceByKey at <console>:26
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._1 + x._1)).collect
res2: Array[(String, (Int, Int))] = Array((scala,(50,50)), (spark,(91,91)), (hadoop,(65,65)))
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._2 + x._2)).collect
res3: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._2 + x._2)).mapValues(v => v._1.toDouble / v._2)
res4: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[10] at mapValues at <console>:26
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._2 + x._2)).mapValues(v => v._1.toDouble / v._2).collect
res5: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
scala> rdd.mapValues(x =>(x,1)).collect
res6: Array[(String, (Int, Int))] = Array((spark,(12,1)), (hadoop,(26,1)), (hadoop,(23,1)), (spark,(15,1)), (scala,(26,1)), (spark,(25,1)), (spark,(23,1)), (hadoop,(16,1)), (scala,(24,1)), (spark,(16,1)))
#foldByKey
scala> rdd.mapValues(x => (x,1)).foldByKey((0,0))((x,y) =>(x._1 + y._1,y._2 + x._2)).mapValues(v => v._1.toDouble / v._2).collect
res10: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
aggregateByKey => 定义初值 + 分区内的聚合函数+分区间的聚合函数
rdd.mapValues(x => (x,1)).aggregateByKey((0,0))((x,y) =>(x._1 + y._1,y._2 + x._2),(a,b) =>(a._1 + b._1,a._2 + b._2)).mapValues(v => v._1.toDouble / v._2).collect
## 初值(这里是元组)可以与RDD元素类型(Int)可以不一致,此时的x就是(0,0)元组,y就是RDD的元素(Int)
scala> rdd.aggregateByKey((0,0))(
| (x,y) =>(x._1 + y,x._2 + 1),
| (a,b) => (a._1 + b._1 ,a._2 + b._2))
res29: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[30] at aggregateByKey at <console>:26
scala> rdd.aggregateByKey((0,0))(
| (x,y) =>(x._1 + y,x._2 + 1),
| (a,b) => (a._1 + b._1 ,a._2 + b._2)).collect
res30: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
scala> rdd.aggregateByKey((0,0))(
| (x,y) =>(x._1 + y,x._2 + 1),
| (a,b) => (a._1 + b._1 ,a._2 + b._2)).mapValues(x => (x._1.toD))
toDegrees toDouble
| (a,b) => (a._1 + b._1 ,a._2 + b._2)).mapValues(x => (x._1.toDouble / x._2))
res31: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[33] at mapValues at <console>:28
scala> rdd.aggregateByKey((0,0))(
| (x,y) =>(x._1 + y,x._2 + 1),
| (a,b) => (a._1 + b._1 ,a._2 + b._2)).mapValues(x => (x._1.toDouble / x._2)).collect
res32: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())((x,y) => {x.append(y);x},(a,b) => a++b).collect
res34: Array[(String, scala.collection.mutable.ArrayBuffer[Int])] = Array((scala,ArrayBuffer(26, 24)), (spark,ArrayBuffer(12, 15, 25, 23, 16)), (hadoop,ArrayBuffer(26, 23, 16)))
scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())((x,y) => {x.append(y);x},(a,b) => a++b).mapValues(x => (x.sum.toDouble / x.size)).collect
res36: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
scala> rdd.aggregateByKey((0,0))((x,y) => (x._1 + y,x._2 + 1),(a,b) => (a._1 + b._1,a._2 + b._2)).collect
res37: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
# 分区内的合并和分区间的合并,可以采用不同的方式
scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())((x,y) => {x.append(y);x},(a,b) => a++b).mapValues(x => (x.sum.toDouble / x.size)).collect
res36: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
# 此时x就是(0,0)元组,y就是rdd的元素(int)
scala> rdd.aggregateByKey((0,0))((x,y) => (x._1 + y,x._2 + 1),(a,b) => (a._1 + b._1,a._2 + b._2)).collect
res37: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
# combineByKey
scala> rdd.combineByKey(
| (x:Int) => (x,1), # 初始值,相当于上面做的一个map(x => (x,1))的一个操作
| (x:(Int,Int),y:Int)=>(x._1 + y, x._2 +1), # 分区内的聚合
# 分区间的聚合
| (a:(Int,Int),b:(Int,Int)) => (a._1 + b._1,a._2 + b._2)).collect
res40: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
## subtractByKey
scala> val rdd1 = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
scala> val rdd2 = sc.makeRDD(Array(("spark", 100), ("hadoop", 300)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at makeRDD at <console>:24
scala> rdd1.subtractByKey(rdd2).collect
res42: Array[(String, Int)] = Array()
scala> val rdd = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at makeRDD at <console>:24
^
scala> val other = sc.makeRDD(Array(("a",10), ("b",20), ("c",30)))
other: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at makeRDD at <console>:24
scala> rdd.subtractByKey(other).collect
res44: Array[(String, Int)] = Array((d,5))
结论:效率相等的情况下,使用最熟悉的方法;groupByKey一般情况下,效率低,尽量少用。
初学者考虑实现,如果使用的groupByKey,寻找代替的算子实现。
为什么groupByKey的效率低
为什么groupByKey的效率低.pngReduceByKey 和 groupByKey的相同点和不同点:
相同点:
- 都作用于RDD[K,V]
- 都是根据key来进行分组聚合
- 默认分区数量不变,但是可以通过参数指定分区数量
不同点:
- groupByKey没有默认的聚合函数,得到的返回值类型是RDD[k,Iterable[v]]
- reduceByKey必须传聚合函数,返回值类型是RDD[k,聚合后的V]
- groupByKey.map() = reduceByKey
排序操作
sortByKey:sortByKey作用于PariRDD函数,对Key进行排序,在org.apache.spark.rdd.OrderedRDDFunctions 中实现:
排序操作.pngscala> val a = sc.makeRDD(List("wyp","spark","hadoop","123321","hive"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24 ^
scala> val b = sc.makeRDD(1 to a.count.toInt)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:26
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[2] at zip at <console>:27
scala> c.collect
res1: Array[(String, Int)] = Array((wyp,1), (spark,2), (hadoop,3), (123321,4), (hive,5))
scala> c.sortByKey().collect
res3: Array[(String, Int)] = Array((123321,4), (hadoop,3), (hive,5), (spark,2), (wyp,1))
scala> c.sortByKey(false).collect
res4: Array[(String, Int)] = Array((wyp,1), (spark,2), (hive,5), (hadoop,3), (123321,4))
join操作
cogroup/join/leftOuterJoin/rightOuterJoin/fullOuterJoin
源码:
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
练习:
scala> val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
#cogroup
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[3] at cogroup at <console>:27
scala> rdd3.collect
res0: Array[(Int, (Iterable[String], Iterable[String]))] = Array((4,(CompactBuffer(Flink),CompactBuffer(王五))), (6,(CompactBuffer(),CompactBuffer(冯七))), (2,(CompactBuffer(Hadoop),CompactBuffer())), (1,(CompactBuffer(Spark),CompactBuffer())), (3,(CompactBuffer(Kylin),CompactBuffer(李四))), (5,(CompactBuffer(),CompactBuffer(赵六))))
scala> rdd3.foreach(println)
(4,(CompactBuffer(Flink),CompactBuffer(王五)))
(6,(CompactBuffer(),CompactBuffer(冯七)))
(2,(CompactBuffer(Hadoop),CompactBuffer()))
(1,(CompactBuffer(Spark),CompactBuffer()))
(3,(CompactBuffer(Kylin),CompactBuffer(李四)))
(5,(CompactBuffer(),CompactBuffer(赵六)))
# leftOuterJoin
scala> rdd1.leftOuterJoin(rdd2).collect
res2: Array[(Int, (String, Option[String]))] = Array((4,(Flink,Some(王五))), (2,(Hadoop,None)), (1,(Spark,None)), (3,(Kylin,Some(李四))))
# rightOuterJoin
scala> rdd1.rightOuterJoin(rdd2).collect
res3: Array[(Int, (Option[String], String))] = Array((4,(Some(Flink),王五)), (6,(None,冯七)), (3,(Some(Kylin),李四)), (5,(None,赵六)))
# fullOuterJoin
scala> rdd1.fullOuterJoin(rdd2).collect
res4: Array[(Int, (Option[String], Option[String]))] = Array((4,(Some(Flink),Some(王五))), (6,(None,Some(冯七))), (2,(Some(Hadoop),None)), (1,(Some(Spark),None)), (3,(Some(Kylin),Some(李四))), (5,(None,Some(赵六))))
# join
scala> rdd1.join(rdd2).collect
res5: Array[(Int, (String, String))] = Array((4,(Flink,王五)), (3,(Kylin,李四)))
# join 实际上调用的就是cogroup,
scala> rdd1.cogroup(rdd2).flatMapValues( pair =>
| for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
| )
res8: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[18] at flatMapValues at <console>:28
测试上面的那个for循环代码:
def main(args: Array[String]): Unit = {
val arrayBuffer = scala.collection.mutable.Map[Int, (ArrayBuffer[String], ArrayBuffer[String])]()
// (4,(CompactBuffer(Flink),CompactBuffer(王五)))
// (6,(CompactBuffer(),CompactBuffer(冯七)))
// (2,(CompactBuffer(Hadoop),CompactBuffer()))
// (1,(CompactBuffer(Spark),CompactBuffer()))
// (3,(CompactBuffer(Kylin),CompactBuffer(李四)))
// (5,(CompactBuffer(),CompactBuffer(赵六)))
arrayBuffer(4) = (ArrayBuffer("Flink"), ArrayBuffer("王五"))
arrayBuffer(6) = (ArrayBuffer(), ArrayBuffer("冯七"))
arrayBuffer(2) = (ArrayBuffer("Hadoop"), ArrayBuffer())
arrayBuffer(1) = (ArrayBuffer("Spark"), ArrayBuffer())
arrayBuffer(3) = (ArrayBuffer("Kylin"), ArrayBuffer("李四"))
arrayBuffer(5) = (ArrayBuffer(), ArrayBuffer("赵六"))
val intToTuples = arrayBuffer.mapValues(v => for (i <- v._1; j <- v._2)
yield (i, j)
)
println(arrayBuffer)
println("*" * 15)
println(intToTuples)
}
输出:
Map(2 -> (ArrayBuffer(Hadoop),ArrayBuffer()), 5 -> (ArrayBuffer(),ArrayBuffer(赵六)), 4 -> (ArrayBuffer(Flink),ArrayBuffer(王五)), 1 -> (ArrayBuffer(Spark),ArrayBuffer()), 3 -> (ArrayBuffer(Kylin),ArrayBuffer(李四)), 6 -> (ArrayBuffer(),ArrayBuffer(冯七)))
***************
Map(2 -> ArrayBuffer(), 5 -> ArrayBuffer(), 4 -> ArrayBuffer((Flink,王五)), 1 -> ArrayBuffer(), 3 -> ArrayBuffer((Kylin,李四)), 6 -> ArrayBuffer())
然后再使用下面的输出去过,进行flatMap
Action操作
collectAsMap、countByKey、lookup(key)
collectAsMap:
scala> val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[4] at join at <console>:27
scala> rdd3.collectAsMap
res0: scala.collection.Map[Int,(String, String)] = Map(4 -> (Flink,王五), 3 -> (Kylin,李四))
countByKey
源码:
/**
* Count the number of elements for each key, collecting the results to a local Map.
*
* @note This method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByKey(): Map[K, Long] = self.withScope {
//将每个key对应的value设置为1,然后对相同的key进行聚合
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
演示
scala> rdd3.countByKey
res2: scala.collection.Map[Int,Long] = Map(4 -> 1, 3 -> 1)
lookup(key):高效的查找方法,只查找对应分区的数据(如果RDD有分区器的话)
源码:
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
* 如果改rdd有分区器,该方法会效率高,因为只查询一个分区,否则会查询所有的元素
*/
def lookup(key: K): Seq[V] = self.withScope {
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
}
}
演示:
scala> val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"), ("3","Scala"),("1","Java")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[7] at makeRDD at <console>:24
scala> rdd1.lookup("1")
res3: Seq[String] = WrappedArray(Spark, Java)
输入与输出
文件输入与输出
文本文件
数据读取:textFi le(String) 可指定单个文件,支持通配符。但是这样对大量的小文件读取销量并不高,应该使用wholeTextFiles
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
* @note On some filesystems, `.../path/*` can be a more efficient way to read all files
* in a directory rather than `.../path/` or `.../path`
* @note Partitioning is determined by data locality. This may result in too few partitions
* by default.
*
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
* @return RDD representing tuples of file path and the corresponding file content
*/
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[Text],
classOf[Text],
updateConf,
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
}
返回值RDD[(String,String)],其中key是文件名称,value是文件内容。
数据保存:saveAsTextFile(String).指定的输出目录
CSV文件
读取CSV(Comma-Separated Values)/TSV(Tab-Separated Values)数据和读取json数据相似,都需要先把文件当作普通文本来读取数据,然后通过每一行进行解析,实现对CSV的读取。
CSV/TSV数据的输出也是需要将结构化RDD通过相关的库转换成字符串RDD。然后使用Spark的文本文件API写出去。
json文件
如果一个JSON文件一行就是一个JSON记录,那么可以通过将JSON文件党对文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
JSON数据的输出主要是通过输出之前将结构化数据组成的RDD转为字符串RDD,然后使用Spark文本文件API写出去
JSON文件的处理使用SparkSQL最为简洁
SequenceFile
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark有专门用来读取SequenceFile的接口,在SparkContext中,可以调用sequenceFile[keyClass,valueClass];
调用saveAsSequenceFile(Path)保存PairRDD,系统将键和值自动转为Writable类型
对象文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制
通过objectFile[k,v](path)接受一个路径,读取对象文件,返回对应的RDD,也可以个通过调用saveAsObjectFile[]实现对对象文件的输出,因为是序列化所以要指定类型
JDBC
见综合案例
算子综合应用案例
WordCount ----- Scala
备注:打包上传服务器运行
package com.hhb.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-26 21:03
**/
object WordCountTest {
def main(args: Array[String]): Unit = {
//1 创建sc
// val sparkConf = new SparkConf().setMaster("local").setAppName("WordCountTest")
//打包到服务器时,不需要有master
val sparkConf = new SparkConf().setAppName("WordCountTest")
val sc = new SparkContext(sparkConf)
//2 读取文件
// val lines = sc.textFile("/Users/baiwang/myproject/spark/data/wc.dat")
//动态的,不能写死
val lines = sc.textFile(args(0))
//3 数据转换
//将每行数据展开
val words = lines.flatMap(line => line.split("\\s+"))
val wordMap = words.map(x => (x, 1))
val result: RDD[(String, Int)] = wordMap.reduceByKey(_ + _)
//4 输出结果
result.foreach(println(_))
//5 关闭sc
sc.stop()
// 打包到集群运行,local模式
// spark-submit --master local[*] --class com.hhb.spark.core.WordCountTest \
// original-ParseDateWork.jar /azkaban-wc/wc.txt
//明天要试一下打包到到standalone模式
//Yarn
// spark-submit --master yarn --class cn.lagou.sparkcore.WordCount \
// original-LagouBigData-1.0-SNAPSHOT.jar /wcinput/*
}
}
WordCount ---- Java
Spark提供了:Scala、Java、Python、R语言的API; 对 Scala 和 Java 语言的支持最好;
wordcount-java.pngpackage com.hhb.java.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-26 21:31
**/
public class JavaWordCount {
public static void main(String[] args) {
//创建sc
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//读取文件
JavaRDD<String> lines = sc.textFile("/Users/baiwang/myproject/spark/data/wc.dat");
//转换数据
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());
JavaPairRDD<String, Integer> wordMap = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> result = wordMap.reduceByKey((x, y) -> x + y);
// 输出结果
result.foreach(r -> System.err.println(r));
//关闭sc
sc.stop();
}
}
备注:
- Spark入口点:JavaSparkContext
- Value-RDD:JavaRDD,key-value RDD:JavaPairRDD
- JavaRDD和JavaPairRDD转化
- JavaRDD => JavaPairRDD: 通过mapToPair函数
- JavaPairRDD => JavaRDD: 通过map函数转化
- lambda表达式使用 ->
计算圆周率
使用蒙特卡洛思想
蒙特卡洛.pngpackage com.hhb.spark.core
import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-26 22:00
**/
object SparkPi {
/**
* 计算圆周率,使用蒙特卡洛法 : 4 /pi = N / n => pi = 4*n/N
*
* @param args
*/
def main(args: Array[String]): Unit = {
//1. 创建sc
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
//2. 读取数据
val N = 10000000
val slices: Int = if (args.length > 0) args(0).toInt else 10
val m = sc.makeRDD(1 to N, slices)
.map(_ => {
val (x, y) = (random, random)
if (x * x + y * y <= 1) 1 else 0
})
//3. 转换数据
val n = m.reduce(_ + _)
//4. 输出
println(s"pi : ${4.0 * n / N}")
// 关闭sc
sc.stop()
}
}
广告数据统计
数据格式:timestamp province city userid adid 时间点,省份,城市,用户,广告
需求:
- 统计每一个省份点击TOP3的广告
- 统计每一个省份每一个小时的Top3的广告
统计每一个省份点击TOP3的广告
package com.hhb.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: 需求1:统计每个省份点击 TOP3 的广告ID
* @author: huanghongbo
* @date: 2020-10-26 22:20
**/
object AdstatTest1 {
def main(args: Array[String]): Unit = {
//1. 创建sc
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
//2. 读取数据
val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/advert.log")
//3. 转换数据
// 字段:时间、省份、城市、用户、广告
val lineRDD: RDD[((String, String), Int)] = lines.map {
line => {
val lineArray: Array[String] = line.split("\\s+")
((lineArray(1), lineArray(4)), 1)
}
}
//(Henan,(5,2189))
//(Hebei,(7,2250))
//(Henan,(0,2237))
//(Jiangsu,(1,2166))
//(Henan,(7,2151))
//(Hebei,(8,2240))
//(Hunan,(7,2132))
//(Hunan,(0,2162))
//(Hubei,(7,2150))
val priRDD: RDD[(String, (String, Int))] = lineRDD.reduceByKey(_ + _).map {
case ((a, b), c) => (a, (b, c))
}
// (Hunan,CompactBuffer((7,2132), (0,2162), (4,2140), (8,2189), (2,2193), (9,2122), (3,2157), (1,2202), (6,2082), (5,2273)))
priRDD.groupByKey()
//(Hunan,List((5,2273), (1,2202), (2,2193), (8,2189), (0,2162), (3,2157), (4,2140), (7,2132), (9,2122), (6,2082)))
//对每个省的数据取所有的value,转换成list后排序,取前三个
.mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(3).map {
case (x, y) => x
}.mkString(":")).collect().foreach(println(_))
//4. 输出
// 关闭sc
sc.stop()
}
}
统计每一个省份每一个小时的Top3的广告
package com.hhb.spark.core
import java.time.{LocalDateTime, ZoneOffset}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: 统计每一个省份每一个小时的 TOP3广告ID
* @author: huanghongbo
* @date: 2020-10-27 13:35
**/
object AdstatTest2 {
/**
* 数据格式:时间点 省份 城市 用户 广告
*
* @param args
*/
def main(args: Array[String]): Unit = {
//创建sc
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]"))
//读取数据
val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/advert.log")
//数据转换
val wordRDD: RDD[((Int, String, String), Int)] = lines.map(line => {
val words = line.split("\\s+")
((LocalDateTime.ofEpochSecond(words(0).toLong / 1000, 0, ZoneOffset.ofHours(8)).getHour, words(1), words(4)), 1)
})
//如果key进行分区后聚合
wordRDD.reduceByKey(_ + _)
//进行数据格式转换
.map { case ((a, b, c), d) => ((a, b), (c, d)) }
//将相同的key进行聚合
.groupByKey()
//将根据value的第二个元素进行排序,然后去前三个
.mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(3))
.collect().foreach(println(_))
//关闭sc
sc.stop()
}
}
总结
总结.png如果将上面的两个都合并到一个main中,读取完数据lines后,先计算第一个需求,然后再使用lines计算第二个需求,那么是读取几次数据文件呢?
回答:两次
找共同好友
Super WordCount
要求:将单词全部转换为小写,去除标点符号(难),去除停用词(难);最后按照 count 值降序保存到文件,同时将全部结果保存到MySQL(难);标点符号和停用词可 以自定义。
停用词:语言中包含很多功能词。与其他词相比,功能词没有什么实际含义。最普遍 的功能词是限定词,介词(on、in、to、from、 over等)、代词、数量词等。
Array[(String, Int)] => scala jdbc => MySQL
第一个版本
package com.hhb.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-27 17:41
**/
object SuperWordCount {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
val lines = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
val list = "and of see the to a in".split("\\s+")
val p = """[()\\?\\.,:;'’”“!\\? ]"""
lines.flatMap(_.split("\\s+"))
.map(word => {
word.toLowerCase()
.replaceAll(p, "")
}).filter(word => word.trim.length > 0 && !list.contains(word))
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.collect().foreach(println(_))
}
}
第二个版本:
package com.hhb.spark.core
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-29 10:46
**/
object SuperWordCount1 {
private val p = """[()\\?\\.,:;'’”“!\\? ]"""
private val list = "and of see the to a in".split("\\s+")
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val userName = "hive"
private val password = "12345678"
private val sql = "insert into test (wort,total) values(?,?);"
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
lines.flatMap(_.split("\\s+"))
.map(x => {
x.toLowerCase.replaceAll(p, "")
})
.filter(word => word.trim.length > 0 && !list.contains(word))
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.foreach { case (k, v) => {
var conn: Connection = null
var st: PreparedStatement = null
try {
conn = DriverManager.getConnection(url, userName, password)
st = conn.prepareStatement(sql)
st.setString(1, k)
st.setInt(2, v)
st.executeUpdate()
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (st != null) st.close()
if (conn != null) conn.close()
}
}
}
}
}
scala链接JDBC
package com.hhb.spark.core
import java.sql.{Connection, DriverManager, PreparedStatement}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-28 21:01
**/
object JDBCDemo {
def main(args: Array[String]): Unit = {
val list = "a b c d e f g".split("\\s+").zipWithIndex
list.foreach(println(_))
val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
val userName = "hive"
val password = "12345678"
val sql = "insert into test (wort,total) values(?,?);"
var conn: Connection = null
var st: PreparedStatement = null
try {
conn = DriverManager.getConnection(url, userName, password)
st = conn.prepareStatement(sql)
list.foreach(w => {
st.setString(1, w._1)
st.setInt(2, w._2)
st.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (st != null) st.close()
if (conn != null) conn.close()
}
}
}
pom
<!-- JDBC -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
第三个版本
package com.hhb.spark.core
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-29 10:46
**/
object SuperWordCount2 {
private val p = """[()\\?\\.,:;'’”“!\\? ]"""
private val list = "and of see the to a in".split("\\s+")
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val userName = "hive"
private val password = "12345678"
private val sql = "insert into test (wort,total) values(?,?);"
private var count = 0;
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
lines.flatMap(_.split("\\s+"))
.map(x => {
x.toLowerCase.replaceAll(p, "")
})
.filter(word => word.trim.length > 0 && !list.contains(word))
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.foreachPartition(iter => {
var conn: Connection = null
var st: PreparedStatement = null
conn = DriverManager.getConnection(url, userName, password)
st = conn.prepareStatement(sql)
count += 1
println(count)
try {
iter.foreach { case (k, v) => {
st.setString(1, k)
st.setInt(2, v)
st.executeUpdate()
}
}
} catch {
case e: Exception => e.printStackTrace()
}
finally {
if (st != null) st.close()
if (conn != null) conn.close()
}
}
)
}
}
第四个版本
package com.hhb.spark.core
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-29 10:46
**/
object SuperWordCount3 {
private val p = """[()\\?\\.,:;'’”“!\\? ]"""
private val list = "and of see the to a in".split("\\s+")
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val userName = "hive"
private val password = "12345678"
private val sql = "insert into test (wort,total) values(?,?);"
private var count = 0;
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
val result = lines.flatMap(_.split("\\s+"))
.map(x => {
x.toLowerCase.replaceAll(p, "")
})
.filter(word => word.trim.length > 0 && !list.contains(word))
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
result.saveAsTextFile("/Users/baiwang/myproject/spark/data/super")
result.foreachPartition(iter => insert(iter))
}
def insert(iter: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var st: PreparedStatement = null
conn = DriverManager.getConnection(url, userName, password)
st = conn.prepareStatement(sql)
count += 1
println(count)
try {
iter.foreach { case (k, v) => {
st.setString(1, k)
st.setInt(2, v)
st.executeUpdate()
}
}
} catch {
case e: Exception => e.printStackTrace()
}
finally {
if (st != null) st.close()
if (conn != null) conn.close()
}
}
}
总结:最终优化版本使用foreachPartition 代替 foreach
备注:
- SparkSQL有方便读写MySQL的方法,给参数直接调用即可
- 但是掌握以上方法非常有必要,因为SparkSQL不支持所有类型的数据库
网友评论