1. RDD 的概述
1.1 RDD 的优势
- 高效容错
- 可以控制数据的分区来优化计算性能
- 并行处理
- 提供了丰富的操作数据的 API
- 可以显式的将任何类型的中间结果存储在内存中
1.2 RDD 的方法
函数 |
功能 |
compute(split: Partition, context: TaskContext): Iterator[T] |
一个计算每个分区数据的函数 |
getPartitions: Array[Partition] |
一个分区列表,用于并行计算 |
getDependencies: Seq[Dependency[_]] = deps |
一个依赖列表,这个rdd依赖的父rdd是哪些(在计算的时候可以通过这个依赖来容错) |
getPreferredLocations(split: Partition): Seq[String] = Nil |
分区数据的优先存储地址。 |
val partitioner: Option[Partitioner] = None |
RDD 是如何分区的,比如某个rdd是通过hash partitioner得到的 |
1.3 常见类型的 RDD
1.3.1 MapPartitionsRDD
- 将自定义的函数应用到父亲 RDD 一个分区到输出数据中去。
- 有且仅有一个窄依赖,即只依赖一个父亲 RDD。
- 分区器:可以选择是否保留父亲 RDD 的分区器。
- 计算分区列表:继承父亲 RDD 的分区列表。
1.3.2 ParallelCollectionRDD
1.3.3 ShuffleRDD
2. RDD 的创建
函数 |
功能 |
sc.textFile(path) |
从文件系统中(本地文件系统或 HDFS ),得到行数据的 HadoopRDD。 |
sc.sequenceFile[KeyClass,ValueClass](path) |
加载 HDFS sequenceFile 文件。 |
sc.parallelize(Seq(1,2,3)) |
从内存中已经存在的序列列表(Seq、List、Array)中,得到 ParallelCollectionRDD |
sc.makeRDD(Seq(1,2)) |
parallelize 别名 |
sc.range(start,end) |
创建区间为 [start,end) 的 MapPartitionsRDD |
3. 依赖
3.1 窄依赖
父亲 RDD 的一个分区数据只能被子 RDD 的一个分区消费,子 RDD 的一个分区可以对应父 RDD 的多个分区。
- OneToOneDependency:map、filter
- RangeDependency:union
- 本地性,一个父分区计算完,子分区计算。
- 失败后计算失败分区。
- 和父亲 RDD 的 Partitioner 相同,并且关联属性是分区键,则不发生 shuffle。
3.2 宽依赖
父亲 RDD 的一个分区数据被子 RDD 的多个分区消费。
- ShuffleDependency:reduceByKey
4. Partitioner
给这个RDD数据进行分区的分区器。
- 从存储系统创建的 RDD 不需要分区,HDFS 有多少数据块,就有多少分区。
- 非 key-value 结构没必要分区。
- key-value 结构需要分区,分区对象为 key。
4.1 实现
- KV 算子,作为参数传入
- 调用 RDD partitionBy 函数
- 调用 RDD coalesce 函数
- 调用 RDD repartition 函数
4.2 分区优化
- 对 RDD 预分区能提高性能。
- 是否保留父 RDD 的分区器,mapValue、flatMapValue 方法保留父 RDD 的分区器。
4.3 对比
HashPartitioner |
RangePartitioner |
|
将可以排序的 key 分到几个大概相等的范围分区内的一个分区中。 |
不支持 Array 类型的 key。 |
不支持不能排序的 Key。 |
可能导致数据倾斜。 |
可以解决分区数据倾斜的问题。 |
分区后的数据不会排序。 |
分区后分区之间的数据是有序的。 |
5. 单类型 RDD 操作 API
5.1 Transformations
方法 |
功能 |
map(func) |
接收函数,将函数应用到 RDD 中到每一个元素,返回新的 RDD。 |
mapPartition(func) |
类似于 map,但独立地在RDD的每一个分片上运行,因此在类型为 T 的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex |
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U] |
flatMap |
对每个输入元素,可以输出多个元素。 |
sample |
|
filter |
接收函数,返回只包含满足 filter 函数的元素的新 RDD。 |
distinct |
去重 |
5.2 Actions
方法 |
功能 |
collect |
返回 RDD 的所有元素。 |
count |
计数。 |
countByValue |
返回一个 Map,表示唯一元素出现的次数。 |
take |
返回几个元素。 |
top |
返回前几个元素。 |
takeOrdered |
返回基于提供的排序算法前几个元素。 |
takeSample(withReplacement,num,[seed]) |
取样 |
reduce |
合并 RDD 中的元素 |
fold |
与 reduce 相似,提供 zero value,rdd.flod(0)(+) |
aggregate() |
|
foreach() |
遍历 rdd 中的每个元素。 |
6. key-value 类型 RDD 操作 API
6.1 key-value 类型 RDD 生成方法
- 自定义
- map 函数生成。
- keyBy 函数生成。
- groupBy 函数生成,在 keyBy 的基础上,将 key 相同的元素进行聚合,基于 groupByKey 实现,相当于 keyBy+groupByKey。
6.2 KeyValue 对 RDD
函数 |
功能 |
combinerByKey |
|
aggregateByKey |
aggregateByKey((0,0))(mergeValue,mergeCombiner)可以实现 value 值、和词频统计功能。 |
reduceByKey |
createCombiner 不对数据进行任何处理,mergeValue 和 mergeCombiner 调用传入的 reduce 函数。 |
distinct |
基于 reduceByKey 实现,键值对都相同,则去重。reduce=((x,y)=>x) |
foldByKey(n) |
createCombiner=mergeValue(n,value) |
groupByKey |
createCombiner:元素转 ArrayBuffer 集合;mergeValue:将新元素添加进集合;mergeCombiner:集合合并。 |
sortByKey |
|
sortBy |
|
7. 二元操作 API
方法 |
功能 |
union |
并集 |
intersection |
交集 |
subtract |
差集 |
方法 |
功能 |
persist(StorageLevel) |
给 RDD 的 StorageLevel成员变量(默认为 None)赋值,存储级别:MEMORY_ONLY(默认)、DISK_ONLY、MEMORY_AND_DISK、OFF_HEAP。 |
cache |
分布式缓存,等于 cache(StorageLevel.MEMORY_ONLY)。 |
unpersist |
移除持久化数据。 |
ietrator |
获得当前 RDD 的输出。 |
localCheckpoint() |
本地磁盘文件,等于 cache(StorageLevel.MEMORY_AND_DISK)。 |
checkpoint() |
HDFS 文件系统。 |
8.集成 Spark SQL
8.1 集成步骤
8.1.1 SparkSession & SQLContext
val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
val sc = new SparkContext(conf)
val spark = SparkSession.builder.config(conf).getOrCreate()
import spark.implicits._
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
8.1.2 将外部数据映射为 RDD
val fileRDD = sc.textFile("/Users/dreamaker/Downloads/data/people.txt")
// 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
val rowRDD: RDD[Row] = fileRDD.map(line => {
val fields = line.split(",")
Row(fields(0), fields(1).trim.toInt)
})
8.1.3 使用 class 或 StructType 封装数据
val peopleRDD = sc.textFile("/Users/dreamaker/Downloads/data/people.txt")
.map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
// 将RDD 转换成 DataFrames
val df = peopleRDD.toDF
val structType: StructType = StructType(
//字段名,字段类型,是否可以为空
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil
)
/**
* rows: java.util.List[Row],
* schema: StructType
* */
val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
8.1.4 使用 DataFrame 创建临时表
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
8.1.5 将 df 写入外部数据源
df.write.format("parquet").save("/Users/dreamaker/Downloads/data/people.parquet")
参考资料
网友评论