Spark RDD 编程指南(官方文档中文版+补充)
1.总览
Spark 提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点划分的元素的集合,可以并行操作。通过从hadoop文件系统中文件或现有的scala集合通过转换来创建RDD.用户还可以将RDD保存在内存中,以使其能够在并行操作中有效的重用,最后RDD能够自动从节点故障中恢复。
Spark的第二个抽象是可以在并行操作中使用的共享变量.Spark 支持两种类型的共享变量:广播变量(可用于在所有节点上的内存中缓存值)、累加器
2. 与spark 连接
要编写spark程序,需要在spark上添加maven依赖:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.4</version>
</dependency>
另外如果需要访问HDFS 集群,则需要hadoop-client 为您的HDFS版本添加依赖项(版本根据集群hdfs 选择)
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.6</version>
</dependency>
另外需要将spark 类导入到程序中,添加以下行:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在Spark 1.3.0之前,您需要显式import org.apache.spark.SparkContext._启用必要的隐式转换。)
3. 弹性分布式数据集(RDD)
Spark围绕弹性分布式数据集(RDD)的概念展开
,RDD是可并行操作的元素的容错集合。创建RDD的方法有两种:并行化驱动程序中的现有集合、或引用外部存储系统中的数据集。
- 并行集合
通过在驱动程序中现有的集合上调用SparkContext 的parallelize 方法来创建并行集合。复制集合的元素以形成可以并行操作的分布式数据集。
如下创建包含数字1到5的并行化集合的方法:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
- 外部数据集
Spark可以从hadoop支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra,HBase,Amazon S3 等。Spark 支持文本文件,SequenceFile 和其他hadoop InputFormat。
可以使用sparkContext 的textfile 方法创建文本文件RDD,如:
val distFile = sc.textFile("data.txt")
3.1 RDD操作
RDD支持两种类型的操作:Transformation(从现有操作中创建新数据集)和Action。在对数据集执行计算后,将值返回给驱动程序。
spark的所有转换都是惰性的,因为他们不会立即计算出结果。相反,它们只记得应用于某些基本数据集的转换。仅当动作要求将结果返回给驱动程序时才计算转换,也就是说只有action方法才会触发job的执行。
RDD的缓存:Spark程序的优化手段,当一个RDD 会被后续的很多计算进行使用的时候,把这个RDD放在缓存中会提高程序的运行效率。
- cache:把数据缓存在内存中
- persist:接受存储 级别来决定把数据缓存在内存或磁盘上,如果不传参,默认缓存在内存中
- unpersist:把RDD从缓存中移除
3.2 将函数传递给Spark
Spark的API在很大程度上依赖于在驱动程序中传递函数以在集群上运行。有两种推荐度的方法可以做到这一点:
- 匿名函数法,可用于简短的代码段。
- 全局单例对象中的静态方法。
//--------------------------------------
//【不推荐】 整个对象都需要被发送到集群
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
//--------------------------------------
//【不推荐】 引用了所有内容this
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
//---------------------------------------
// 【推荐做法】
class MyClass {
val field = "hello"
def doStuff(rdd: RDD[String]): RDD[String] = map{
val field_ = this.field
rdd.map(x=>field_+x)
}
}
注意:如上代码示例,全局单例对象的静态方法在应用过程中为了避免引用整个对象或this的全部内容,最简单的方法是将其复制到局部变量中,而不是从外部访问它。
3.3 了解闭包
关于spark的难点之一是在跨集群执行代码时了解变量和方法的范围和生命周期。 修改超出其范围的变量的RDD操作可能经常引起混乱。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
上面的代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务由执行程序执行。在执行之前,Spark计算任务的闭包。闭包是执行者在RDD上执行器计算时必须可见的那些变量和方法。此闭包被序列化并发送给每个执行者。
发送给每个执行者的闭包中的变量均为副本,因此在foreach函数中引用counter时,它不再是驱动程序节点上的counter。驱动程序节点内存中仍有一个counter,但它不再对执行者可见! 执行者只能从序列化闭包中看到副本。因此,对counter 的所有操作都引用了序列化闭包内的值,所以counter的最终值仍将为0。
为确保此类情况下情况明确,应使用累加器(Accumulator),Spark中的累加器专门用于在集群中的各个工作节点之间拆分执行是安全的更新变量。
3.4 RDD的打印元素
//打印驱动器上的所有元素,需要使RDD到驱动器节点
rdd.collect().foreach(println)
// 该方法可能会导致驱动程序内存不足,因为collect()将整个RDD提取到一台计算机上
// 更安全的方法是使用take()
rdd.take(100).foreach(println)
4 共享变量
4.1 广播变量
广播变量是在driver上声明,在executor上使用的一些变量数据,广播变量对executor来说是只读的。
使用步骤:
- 1.在driver上声明广播变量
- 2.在executor执行的函数中使用这个广播变量
工作过程中,经常在两张表关联的时候使用广播变量。
spark上对两个RDD关联的时候,如果一个RDD的数据量很小,一个RDD的数据量很大,可以使用广播变量把关联过程变成map端关联。把小表的数据变成广播变量,广播给每一个加载大表RDD的执行者。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
4.2 累加器
累加器是在driver上声明,在executor上进行累加计数,累加器对executor来说是只写
使用步骤:
- 1.在driver上声明累加器
- 2.在executor上对累加器进行累加数据处理
- 3.在driver上读取不得到累加结果
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
Spark本身支持数字类型的累加器,程序员也可以添加对新类型的支持
5. 部署到集群
5.1 程序直接发送到spark集群运行
大多用于开发、调试阶段
把Master的url设置成集群主节点服务的url
val conf = new SparkConf().setAppName("wc").setMaster("spark://centos1:7077")
把本地的项目打包(jar包),然后在sparkContext 上添加:
val sc = new SparkContext(conf)
sc.addJar("E:\\resources\\sparktest23\\out\\artifacts\\sparktest23_jar\\sparktest23.jar")
然后在idea上右键运行就能把程序发送到Spark集群来运行。
5.2 项目打包,使用spark的submit指令发布到spark集群运行
生产环境大多使用此种方式,并通过调度系统进行调度执行。
spark2-submit
--class com.jingtoo.eportfolios.spark.UnitInsurance ./unit_insurance.jar
--master yarn?
--deploy-mode cluster
--num-executors 25
--executor-cores 2
--executor-memory 8G
--driver-memory 2g
官方原版(RDD编程指南):http://spark.apache.org/docs/latest/rdd-programming-guide.html
网友评论