在学习了Spark RDD和RDD操作之后,是不是很想快点写个Spark程序来巩固一下所学的知识。学习大数据开发怎么能少了wordcount呢?
话不多说,直接上代码:
val lines = sc.textFile("data/dataset.txt")
val result = lines.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
result.collect()
.foreach(x => println(x._1 + " = " + x._2))
输出结果:
Deer = 2
Bear = 2
Car = 3
River = 2
短短几行代码就搞定了WordCount,想想写的MapReduce的WordCount程序,Spark真是好用太多。
接下来,我们要利用刚刚编写的程序,来干些其他的事情...
首先,我们可以通过Spark提供的toDebugString
方法来查看RDD的谱系:
result.toDebugString
输出:
(2) ShuffledRDD[4] at reduceByKey at WordCount.scala:21 []
+-(2) MapPartitionsRDD[3] at map at WordCount.scala:20 []
| MapPartitionsRDD[2] at flatMap at WordCount.scala:19 []
| data/dataset.txt MapPartitionsRDD[1] at textFile at WordCount.scala:17 []
| data/dataset.txt HadoopRDD[0] at textFile at WordCount.scala:17 []
从输出信息中,我们可以看到WordCount程序中创建出的一些RDD。这里要从下往上看。
调用sc.textFile
创建出了一个MapPartitionsRDD。实际在textFile
的内部是先创建出了一个HadoopRDD,然后对该RDD进行映射操作,最终得到了MapPartitionsRDD。然后再经过flatMap
、Map
和reduceByKey
等转换操作,后一个RDD依赖于前一个RDD。最后调用collect
行动操作,生成一个Job。此时Spark调度器会创建出用户计算行动操作的RDD的物理执行计划。Spark调度器会从最终被调用的collect()
操作的RDD出发,向上回溯所有必须计算的RDD。调度器会访问RDD的父节点,父节点的父节点,依次类推,递归向上生成计算所有必要的祖先RDD的物理计划。
DAG的生成
原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD是由哪些Parent RDD(s)转换而来和它依赖Parent RDD(s)的哪些Partitions。通过这些依赖关系,就形成了RDD的Lineage(血统)。借助Lineage,可以保证一个RDD被计算前,它所依赖的Parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么可以通过重新计算这部分丢失的数据而不用重新计算所有数据。
如何根据DAG生成计算任务
根据依赖关系的不同,将DAG划分为不同的阶段(Stage)。对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程中完成,窄依赖被Spark划分到同一个Stage;对于宽依赖,由于Shuffle的存在,只能在Parent RDD(s)处理完成后,才能开始接下来的计算,因此宽依赖就是Spark划分Stage的依据。在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行。Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的。也就是说,Stage只有在它没有Parent Stage或者Parent Stage都已经执行完成后,才可以执行。
在上面WordCount例子中,在执行转换操作reduceByKey
时会触发一个Shuffle(洗牌)的过程。因此,会将整个Job从此处拆分,形成两个Stage。在Spark WebUI(http://127.0.0.1:4040)中,我们可以看到:
Stage1依赖于Stage0,因此Stage0要先执行,然后再执行Stage1。下面再看看Stage0和Stage1中的RDD转换关系。
Stage0在Stage0中从读入文件到最终的MapPartitionsRDD是流水线执行的。在Stage0中生成了2个task,如下图所示:
imageStage0 Task的输出作为Stage1 Task的输入,得到最终结果。
Stage1 image小结
RDD经过一系列的转换之后,会在最后一个RDD上调用一个行动操作,这时会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。Spark中Task分为ShuffleMapTask
和ResultTask
两种。DAG的最后的一个Stage(对应Stage1)会为每个结果的Partition生成一个ResultTask,其余所有的Stage都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务。
一个Spark执行流程:
- 用户代码定义RDD的有向无环图
RDD上的操作会创建出新的RDD,子RDD会引用父RDD,形成DAG。 - 行动操作把有向无环图强制转译为执行计划
当调用一个RDD的行动操作时,这个RDD就必须被计算出来。这也要求计算出该RDD的父节点。Spark调度器提交一个作业来计算所有必要的RDD。这个作业会包含一个或者多个步骤,每个步骤对应一批计算任务。一个步骤也对应DAG中的一个或多个RDD(流水线执行)。 - 任务在集群中调度并执行
一个任务处理一个分区数据,在调用RDD的行动操作后,会生成很多任务,Spark调度器会将任务调度到Worker上进行执行。一旦作业的最后一个步骤结束,一个行动操作也就执行完毕了。
好文推荐:
网友评论