输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block。
当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。
随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。
随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。
每个节点可以起一个或多个Executor。
每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。
每个Task执行的结果就是生成了目标RDD的一个partiton。
注意: 这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。
而 Task被执行的并发度 = Executor数目 * 每个Executor核数。
至于partition的数目:
对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
在Map阶段partition数目保持不变。
在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。
转自:http://www.zezhi.net/409.html
https://www.zhihu.com/question/33270495/answer/93424104
另外两个spark开发中经常遇到的小问题:
1、RDD的API所引用的所有对象,都必须是可序列化的
![](https://img.haomeiwen.com/i16845595/815bb885a9a29050.png)
在RDD的API里所引用的在RDD的API里所引用的所有对象,都必须是可序列化的,因为RDD分布在多台机器是,代码和所引用的对象会序列化,然后复制到多台机器,所以凡是被引用的数据,都必须是可序列化的。否则会报java.lang.NotSerializableException: scala.util.Random 异常,解决办法就是把引用对象序列化 extends Serializable或者使用kryo序列化。
2、在一个RDD的api里不可以引用另外一个RDD
SPARK-5063 in spark,Spark does not support nested RDDs or performing Spark actions inside of transformations; this usually leads to NullPointerExceptions (seeSPARK-718 as one example). The confusing NPE is one of the most common sources of Spark questions on StackOverflow:
上边英文大致意思是:Spark的transformation算子中不支持嵌套RDD,会导致空指针,如果其中一个RDD数据量不大,则可以用文章中提高的广播变量解决这个问题,如果数据量很大使用广播变量会导致OOM,那么就要从其他方面进行优化或者从业务逻辑进行出发。
网友评论