spark

作者: 博瑜 | 来源:发表于2017-06-12 08:43 被阅读0次

    rdd 特点
    * - A list of partitions
    * - A function for computing each split
    * - A list of dependencies on other RDDs
    * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
    * an HDFS file)

    sparkContext 是通向spark集群的入口
    可在不在集群中的机器上调用spark-submit或者spark shell

    spark-submit脚本提供了提供应用到集群最直接的方式。对于Standalone模式而言,Spark目前支持两种部署模式。在Client模式中,Driver程序在提交命令的机器上。在Cluster模式中,Driver从集群中的worker节点中任取一个运行驱动程序。
    http://blog.csdn.net/liyaohhh/article/details/50753745

    yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行。然而yarn-cluster模式不适合运行交互类型的作业。而yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开。看下下面的两幅图应该会明白(上图是yarn-cluster模式,下图是yarn-client模式):

    image.png image.png

    ------------------
    standalone master管理worker,进而负责资源调度
    worker管理当前计算节点,worker会启动executor来完成真正的计算

    standalone基本配置master上只需要配两个文件,sparkenv 中写3个东西 ma s te r ip,master port 这个用于rpc和worker,jdk path
    slaves写worker ip

    worker机器上只需要sparkenv不用slaves。
    在master上startall,在其他worker机器上startall起不来master。master节点通过ssh 命令在其他机器上启动worker。之后worker与master的通信如注册,heartbeat通过rpc tcp协议发送消息。

    启动worker,worker主动向master报告自己的cores和memory大小

    standalone模式下,sparkshell不制定master则是本机启动。指定master是集群。还可以指定memory与core。

    sparksubmit或sparkshell启动了一个用于提交任务的客户端(driver),driver要与master建立连接(sparkcontext),如果不建立sparkconext,worker上的executor不会启动,driver是个进程,里面有个sparkcontext object,这个object与master建立连接,这个连接过程中告诉master executor的core与memory,master会进行资源的分配(master已经知道每个worker的资源),给符合条件的worker发消息,worker启动executor,接下来executor要和driver进行通信,那么executor怎么知道driver在哪儿呢?(driver告诉了master,master告诉worker,worker启动executor,executor就知道了)
    sparkcontext创建好后,可以构建dag(trans-trans-action), transformation在driver,action触发提交作业,从后向前切成一个个stage,提交一个个(从第一个)stage。stage是一个个task,放到一个taskset,同一个stage中task的业务逻辑相同(taskset中的task业务逻辑相同),但是数据不同。比如数据有4个partition,那么就有4个task,有两个executor,均分到上面,每个2个task。如果不想均分,要改调度,对master进行一些配置。然后比如要读取hdfs,hdfs️4个分区,然后一个task读取一个分区,然后每个task不是一下全读hdfs,而是拿到迭代器,拿一条读一条。计算结果放在executor内存或disk(不够的时候)。action比如是collect,发给driver。action比如是写hdfs,executor就直接并发写了。

    spark 程序:app
    用于提交应用程序的: driver
    资源管理:master
    节点管理:worker
    执行真正的业务逻辑:executor

    Screen Shot 2017-06-13 at 8.30.07 AM.png

    细节
    dag怎么切分
    资源怎么调度
    executor中业务逻辑怎么执行

    word count
    val rdd = sc.textfile().flatMap(.split(" ")).map(, 1).reduceByKey(_ + _)

    textFile 产生hadooprRDD ->mapPartitionsRDD
    rdd.toDebugString
    rdd.dependencies
    flatMap 和map 都是产生mapPartitionsRDD
    reduceByKey shuffledRDD
    saveAsTextFile mapPartitionsRDD
    一共产生6个RDD

    image.png

    经过一系列transformation,action触发提交dag到 dagscheduler,dagscheduler 划分stage,按顺序将一个个taskset(通过taskscheduler)分发到worker。

    spark 读hdfs 权限可在程序里设置, system.setproperty("user.name", "name")

    spark repartition 有shuffle write
    reparition之后cache,相当于cache的是repartition中最后的rdd

    Paste_Image.png

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
    val rdd2 = rdd1.repartition(6).cache
    val rdd3 = rdd2.map(_ + 1).cache
    rdd3.collect
    val rdd4 = rdd2.map(_ * 2).cache
    rdd4.collect

    这样的话在job图里会显示

    Paste_Image.png Paste_Image.png

    在stage中会显示

    Paste_Image.png Paste_Image.png

    所以cache不cache住要在job图里看。即使repartition后的被cache住了,还是会在stage中的第一个框框里显示

    Paste_Image.png

    val RDD1 = sc.parallelize(List(1,2,3,4,5,6),2)
    val RDD2 = RDD1.repartition(6)
    val RDD3 = RDD2.map(_ + 1).cache
    val RDD4 = RDD3.map(_ * 2)
    RDD4.collect
    val RDD5 = RDD3.map(_ * 3)
    RDD5.collect

    job UI

    Paste_Image.png Paste_Image.png

    stage UI

    Paste_Image.png Paste_Image.png Paste_Image.png

    所以结论是什么呢?比如你cache了某个rdd

    相关文章

      网友评论

          本文标题:spark

          本文链接:https://www.haomeiwen.com/subject/bbokqxtx.html