本文内容是 是学习 范东来《Spark 课程》 笔记
- RDD 不可变, 只读,经过变化会生成新的对象
弹性 表现在2个方面:
- 任意一个节点出错时可以通过依赖关系恢复
- 通过transformation 时分区可能发生变化
在 Spark 源码中,RDD 是一个抽象类
每个 RDD 都有如下几个成员:
分区的集合;
用来基于分区进行计算的函数(算子);
依赖(与其他 RDD)的集合;
对于键-值型的 RDD 的散列分区器(可选);
对于用来计算出每个分区的地址集合(可选,如 HDFS 上的块存储的地址)。
参考RDD 的源码
// 表示RDD之间的依赖关系的成员变量
@transient private var deps: Seq[Dependency[_]]
// 分区器成员变量
@transient val partitioner: Option[Partitioner] = None
// 该RDD所引用的分区集合成员变量
@transient private var partitions_ : Array[Partition] = null
// 得到该RDD与其他RDD之间的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 得到该RDD所引用的分区
protected def getPartitions: Array[Partition]
// 得到每个分区地址
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// distinct算子
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
创建 RDD
- 并行集合;(RDD 纯粹是为了学习,将内存中的集合变量转换为 RDD,没太大实际意义)
val a = Array(1,2,3,4,5,6,7,8,9,10)
val rdd1 = sc.parallelize(a)
- 从 HDFS 中读取;( 非常有用 )
- 从外部数据源读取;
- PairRDD
从外部数据源读取
-
JDBC 所有支持 JDBC 的数据库都可以通过这种方式进行读取,也包括支持 JDBC 的分布式数据库,但是你需要注意的是,从代码可以看出,这种方式的原理是利用多个 Executor 同时查询互不交叉的数据范围,从而达到并行抽取的目的。但是这种方式的抽取性能受限于 MySQL 的并发读性能,单纯提高 Executor 的数量到某一阈值后,再提升对性能影响不大
-
HBase
HBase这种分布式数据库,在数据存储时也采用了分区的思想,HBase 的分区名为 Region,那么基于 Region 进行导入这种方式的性能就会比上面那种方式快很多,是真正的并行导入。
值得一提的是 HBase 有一个第三方组件叫 Phoenix,可以让 HBase 支持 SQL 和 JDBC,在这个组件的配合下,第一种方式也可以用来抽取 HBase 的数据,此外,Spark 也可以读取 HBase 的底层文件 HFile,从而直接绕过 HBase 读取数据。说这么多,无非是想告诉你,读取数据的方法有很多,可以根据自己的需求进行选择。
通过第三方库的支持,Spark 几乎能够读取所有的数据源,例如 Elasticsearch
PairRDD
PairRDD 与其他 RDD 并无不同,只不过它的数据类型是 Tuple2[K,V],表示键值对,因此这种 RDD 也被称为 PairRDD,泛型为 RDD[(K,V)],而普通 RDD 的数据类型为 Int、String 等。这种数据结构决定了 PairRDD 可以使用某些基于键的算子,如分组、汇总等。PairRDD 可以由普通 RDD 转换得到:
复制代码
//val spark: SparkSession = .......
val a = spark.sparkcontext.textFile("/user/me/wiki").map(x => (x,x))
网友评论