概述
总体来说,每个Spark应用程序都包含一个驱动程序
,运行了用户的main
函数并且在集群上执行多种并行操作
。Spark提供的主要抽象就是弹性分布式数据集(RDD)
,它是跨节点的元素集合,可以并行操作。RDD可以由Hadoop文件系统(或者其它Hadoop支持的文件系统)的文件创建,也可以通过转换驱动程序中已存在的Scala集合创建。用户可以在内存中缓存RDD,方便在并行操作之间有效地重用RDD。最后,RDD可以自动从节点错误中恢复。
Spark的第二个抽象是共享变量,可用于并行操作。默认情况下,当Spark将一个函数作为不同节点上的一组任务并行执行时,会把函数中每个变量副本分发给各个任务。有时候,变量需要在任务之间共享,或者在任务和驱动程序之间共享。Spark支持两种共享变量:broadcast variables(广播变量)
,用于在所有节点内存中缓存变量,accumulators(累加器)
,只允许"add"
操作,如计数和求和。
这篇编程指南使用Scala展示Spark的特性。学习Spark最简单的方式就是使用交互式shell,Scala语言用bin/spark-shell
。
与Spark建立连接
Spark 2.1.1默认使用Scala2.11。(Spark也可以重新构建,适配其它版本的Scala)。用Scala编写应用程序,需要使用兼容版本(如2.11.X)。
编写Spark应用程序,需要添加Spark的Maven依赖。
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.1.1
另外,如果想要访问HDFS集群,需要添加对应HDFS的hadoop-client
依赖。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,需要在程序中引入一些Spark类。添加下面的内容:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
在Spark 1.3.0之前,需要显示地添加import org.apache.spark.SparkContext._
来启用隐式转换。
初始化Spark
Spark程序要做的第一件事就是创建一个SparkContext对象,Spark用它来访问集群。创建SparkContext
对象之前要先创建SparkConf对象,其中包含了应用程序的信息。
每个JVM中只能有一个活跃的SparkContext。创建新的SparkContext之前必须stop()
当前活跃的SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
appName
参数是应用程序的名字,在集群的UI上显示。master
是一个Spark, Mesos or YARN cluster URL,或者是一个特殊的"local"
字符串来运行本地模式。在实践中,当在集群上运行时,不要在程序中硬编码master
,而是使用spark-submit启动应用程序,然后从那里接收master
参数。如果是本地测试或者单元测试,可以直接传"local"
运行。
使用Shell
在Spark shell中,SparkContext已经为你创建好了,变量名字为sc
。自己创建SparkContext是不好使的。可使用--master
参数设置上下文,使用--jars
参数添加JAR包,后面跟一个逗号分隔的list。也可以给shell session添加依赖(如Spark Packages),使用--packages
参数后面跟上逗号分隔的maven coordinates列表。存在依赖关系的附加库(如Sonatype)可传给--repositories
参数。例如,在4核上运行bin/spark-shell
,使用:
$ ./bin/spark-shell --master local[4]
或者,添加code.jar
,使用:
$ ./bin/spark-shell --master local[4] --jars code.jar
使用maven coordinates引入依赖:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
运行spark-shell --help
查看所有参数列表。spark-shell
调用了更通用的spark-submit脚本。
网友评论