rdd 特点
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
sparkContext 是通向spark集群的入口
可在不在集群中的机器上调用spark-submit或者spark shell
spark-submit脚本提供了提供应用到集群最直接的方式。对于Standalone模式而言,Spark目前支持两种部署模式。在Client模式中,Driver程序在提交命令的机器上。在Cluster模式中,Driver从集群中的worker节点中任取一个运行驱动程序。
http://blog.csdn.net/liyaohhh/article/details/50753745
yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行。然而yarn-cluster模式不适合运行交互类型的作业。而yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开。看下下面的两幅图应该会明白(上图是yarn-cluster模式,下图是yarn-client模式):
image.png image.png------------------
standalone master管理worker,进而负责资源调度
worker管理当前计算节点,worker会启动executor来完成真正的计算
standalone基本配置master上只需要配两个文件,sparkenv 中写3个东西 ma s te r ip,master port 这个用于rpc和worker,jdk path
slaves写worker ip
worker机器上只需要sparkenv不用slaves。
在master上startall,在其他worker机器上startall起不来master。master节点通过ssh 命令在其他机器上启动worker。之后worker与master的通信如注册,heartbeat通过rpc tcp协议发送消息。
启动worker,worker主动向master报告自己的cores和memory大小
standalone模式下,sparkshell不制定master则是本机启动。指定master是集群。还可以指定memory与core。
sparksubmit或sparkshell启动了一个用于提交任务的客户端(driver),driver要与master建立连接(sparkcontext),如果不建立sparkconext,worker上的executor不会启动,driver是个进程,里面有个sparkcontext object,这个object与master建立连接,这个连接过程中告诉master executor的core与memory,master会进行资源的分配(master已经知道每个worker的资源),给符合条件的worker发消息,worker启动executor,接下来executor要和driver进行通信,那么executor怎么知道driver在哪儿呢?(driver告诉了master,master告诉worker,worker启动executor,executor就知道了)
sparkcontext创建好后,可以构建dag(trans-trans-action), transformation在driver,action触发提交作业,从后向前切成一个个stage,提交一个个(从第一个)stage。stage是一个个task,放到一个taskset,同一个stage中task的业务逻辑相同(taskset中的task业务逻辑相同),但是数据不同。比如数据有4个partition,那么就有4个task,有两个executor,均分到上面,每个2个task。如果不想均分,要改调度,对master进行一些配置。然后比如要读取hdfs,hdfs️4个分区,然后一个task读取一个分区,然后每个task不是一下全读hdfs,而是拿到迭代器,拿一条读一条。计算结果放在executor内存或disk(不够的时候)。action比如是collect,发给driver。action比如是写hdfs,executor就直接并发写了。
spark 程序:app
用于提交应用程序的: driver
资源管理:master
节点管理:worker
执行真正的业务逻辑:executor
细节
dag怎么切分
资源怎么调度
executor中业务逻辑怎么执行
word count
val rdd = sc.textfile().flatMap(.split(" ")).map(, 1).reduceByKey(_ + _)
textFile 产生hadooprRDD ->mapPartitionsRDD
rdd.toDebugString
rdd.dependencies
flatMap 和map 都是产生mapPartitionsRDD
reduceByKey shuffledRDD
saveAsTextFile mapPartitionsRDD
一共产生6个RDD
经过一系列transformation,action触发提交dag到 dagscheduler,dagscheduler 划分stage,按顺序将一个个taskset(通过taskscheduler)分发到worker。
spark 读hdfs 权限可在程序里设置, system.setproperty("user.name", "name")
spark repartition 有shuffle write
reparition之后cache,相当于cache的是repartition中最后的rdd
val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
val rdd2 = rdd1.repartition(6).cache
val rdd3 = rdd2.map(_ + 1).cache
rdd3.collect
val rdd4 = rdd2.map(_ * 2).cache
rdd4.collect
这样的话在job图里会显示
Paste_Image.png Paste_Image.png在stage中会显示
Paste_Image.png Paste_Image.png所以cache不cache住要在job图里看。即使repartition后的被cache住了,还是会在stage中的第一个框框里显示
Paste_Image.pngval RDD1 = sc.parallelize(List(1,2,3,4,5,6),2)
val RDD2 = RDD1.repartition(6)
val RDD3 = RDD2.map(_ + 1).cache
val RDD4 = RDD3.map(_ * 2)
RDD4.collect
val RDD5 = RDD3.map(_ * 3)
RDD5.collect
job UI
Paste_Image.png Paste_Image.pngstage UI
Paste_Image.png Paste_Image.png Paste_Image.png所以结论是什么呢?比如你cache了某个rdd
网友评论