美文网首页
Spark中的Job、Stage、Task

Spark中的Job、Stage、Task

作者: 严国华 | 来源:发表于2020-08-26 11:32 被阅读0次

    原文链接:https://www.jianshu.com/p/e1243537d0fd

    写在前面

    台风夜的电话面试里被问到了spark运行任务的过程中stage的划分依据。一下子就给整懵了,支支吾吾答非所问。从事大数据的开发也有一年半光景,spark任务的运行原理依旧知之甚少。因此就参阅各种优秀的文章,再配上一个自己工作中的实际项目,特意整理出这篇笔记,以此警示自己的自大与无知。

    测试环境

    本地开发环境

    idea 2019.1.2

    maven 3.6

    spark 2.4.3

    scala 2.1.8

    jdk1.8

    测试集群环境

    spark 2.4.3

    scala 2.1.8

    jdk1.8

    hadoop2.7.4

    集群

    测试项目例子

    计算某一天店铺销售额\时段销售额top10

    样例数据字段格式

    filed1|filed2|filed3|store_no|filed5|filed6|filed7|filed8|amount|filed10|filed11|sale_time

    这里不提供具体的测试数据,实验过程中需要自己模拟所用的数据。

    样例demo

    packagecom.dr.leoimportcom.dr.leo.utils.StrUtilsimportorg.apache.hadoop.io.{LongWritable,Text}importorg.apache.hadoop.mapred.{FileSplit,InputSplit,TextInputFormat}importorg.apache.spark.SparkContextimportorg.apache.spark.rdd.{HadoopRDD,RDD}importorg.apache.spark.sql.SparkSession/**

      * @author leo.jie (weixiao.me@aliyun.com)

      * @organization DataReal

      * @version 1.0

      * @website https://www.jlpyyf.com

      * @date 2019-07-28 20:29

      * @since 1.0

      */objectWordCount{defmain(args:Array[String]):Unit={valspark:SparkSession=SparkSession.builder().appName("WordCount")//.master("local[*]").enableHiveSupport().getOrCreate()valsc:SparkContext=spark.sparkContextvalfileRddOri=loadFileToRdd(sc,"hdfs://leo/test/pos.DAT")valfileRdd=fileRddOri.map(x=>for(data<-x._2.split("\\|"))yieldif(data==null)""elsedata.trim).filter(x=>x.length==12).map(x=>(retailer_shop_code(x(3)),x(10).substring(10,13),x(8).toFloat)).map(x=>((x._1,x._2),x._3)).reduceByKey(_+_)fileRdd.top(10)(Ordering.by(e=>e._2)).foreach(println(_))println("##########################################################")fileRdd.map(x=>(x._1._1,x._2)).reduceByKey(_+_).top(10)(Ordering.by(e=>e._2)).foreach(println(_))println("##########################################################")println(fileRdd.count())println("##########################################################")println(fileRdd.first())println("##########################################################")fileRdd.take(10).foreach(println(_))while(true){;}spark.stop()}/**

        * 读取gbk编码的file

        * @param sc

        * @param path

        * @param encoding

        * @return

        */defloadFileToRdd(sc:SparkContext,path:String,encoding:String="GBK"):RDD[(String,String,Int)]={sc.hadoopFile[LongWritable,Text,TextInputFormat](path).asInstanceOf[HadoopRDD[LongWritable,Text]].mapPartitionsWithInputSplit((inputSplit:InputSplit,iterator:Iterator[(LongWritable,Text)])=>{valfile=inputSplit.asInstanceOf[FileSplit]iterator.filter(x=>x._2!=null).map(x=>{(file.getPath.getName,newString(x._2.getBytes,0,x._2.getLength,encoding),1)})})}/**

        * 只是一个店铺号转换的函数

        * @param retailer_shop_code

        * @return

        */defretailer_shop_code(retailer_shop_code:String):String={if(StrUtils.isBlank(retailer_shop_code))""elseif(retailer_shop_code.length==5)retailer_shop_code.substring(0,retailer_shop_code.length-1).toUpperCase()elseif(retailer_shop_code.length==6)retailer_shop_code.substring(0,retailer_shop_code.length-2).toUpperCase()elseif(retailer_shop_code.length==8)retailer_shop_code.substring(0,retailer_shop_code.length-2).toUpperCase()elseretailer_shop_code}}

    运行测试

    程序打包后发往集群提交任务。所用命令

    [hadoop@node1leo_demo]$ spark-submit--master yarn--deploy-mode cluster--driver-memory2G--driver-cores2--executor-memory2g--num-executors5--executor-cores2--conf spark.yarn.executor.memoryOverhead=2048--conf spark.network.timeout=30000--classcom.dr.leo.WordCountleo-study-spark.jar

    spark-ui上的信息告诉了我们什么?

    查看任务的运行信息

    spark任务运行的过程中,我们可以点击 <code style="color:red">ApplicationMaster</code> 跳转任务运行的界面。

    yarn

    运行流程之:job

    job

    此时我们提交的任务的所有job都已经运行成功,只因为程序中任务执行完毕后是一段无限循环,所以这个界面会一直存在,直到我们手动在yarn上kill掉这个application。

    我们写的代码被提交运行的过程中,会先被划分为一个又一个job,这些job按照被划分的先后顺序会依次执行。

    图示中我们已经知道,我们提交的任务,最终被划分成了5个job。

    所谓一个job,就是由一个rdd action触发的动作。简单理解为,当你需要执行一个rdd的action操作的时候,就会生成一个job。

    这里不会赘述什么是rdd的action操作。

    结合代码与图示我们可以知道:

    job-0 的产生是由于触发了top操作

    <code style="color:red">top at WordCount.scala:33</code>

    job-1 的产生是由于触发了top操作

    <code style="color:red">top at WordCount.scala:35</code>

    job-2 的产生是由于触发了count操作

    <code style="color:red">count at WordCount.scala:37</code>

    job-3 的产生是由于触发了first操作

    <code style="color:red">first at WordCount.scala:39</code>

    job-4 的产生是由于触发了take操作

    <code style="color:red">take at WordCount.scala:41</code>

    运行流程之:stage

    选择任意一个job,点击链接去查看该job的detail,这里我们选择job-0。

    job-detail

    由图示我们可知,job-0由两个stage组成,并且每个stage都有8个task,说明每个stage的数据都在8个partition上。

    下面我们将详细说明stage以及stage的划分依据。

    stage 概念、划分

    貌似没有十分明确的概念来十分清楚地说明spark stage究竟是什么?这里只记录从众多优秀的博客里提取的直言片语,以及本人的一点见解。

    stage的划分是以shuffle操作作为边界的。也就是说某个action导致了shuffle操作,就会划分出两个stage。

    stage的划分在RDD的论文中也有详细介绍,简单的说是以shuffle和result这两种类型来划分。在spark中有两类task,一类是shuffle map task,一类是result task,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也依次为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。

    在DAGScheduler中,会将每个job划分成多个stage,每个stage会创建一批task并且计算task的最佳位置,一个task对应一个partition。DAGScheduler的stage划分算法如下:它会从触发action操作的那个RDD开始往前推,首先会为最后一个RDD创建一个stage,然后往前倒推的时候,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的stage,那个RDD就是新的stage的最后一个RDD,然后依次类推,继续往前倒推,根据窄依赖或者宽依赖进行stage的划分,直到所有的RDD全部遍历完成为止。

    spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。<code style="color:red">遇到宽依赖就划分stage</code>,每个stage包含一个或多个task任务。然后将这些task以task set的形式交给TaskScheduler运行。<code style="color:red">stage是由一组并行的task组成</code>。

    窄依赖

    父RDD和子RDD partition之间的关系是一对一的。不会有shuffle的产生。父RDD的一个分区去到子RDD的一个分区中。

    宽依赖

    父RDD与子RDD partition之间的关系是一对多的。会有shuffle的产生。父RDD的一个分区去到子RDD的不同分区里面。

    其实区分宽窄依赖,主要就是看父RDD的一个partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖。以WordCount为例,看图理解:

    spark stage

    运行流程之:task

    task是stage下的一个任务执行单元,一般来说,一个rdd有多少个partition,就会有多少个task,因为每一个task只是处理一个partition上的数据。

    作者:NikolasNull

    链接:https://www.jianshu.com/p/e1243537d0fd

    来源:简书

    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

          本文标题:Spark中的Job、Stage、Task

          本文链接:https://www.haomeiwen.com/subject/fjuxsktx.html