1.自动进行内存和磁盘存储的切换
spark会优先将数据存储在内存中,如果内存放不下,才把数据写入磁盘,不但能计算内存中的数据,也能计算内存放不下的数据。
2.基于Lineage(血统)高容错机制
Lineage是基于spark的依赖关系来完成,每个操作只关联父操作,各分片之间的数据互不影响,出现错误的时候只需要恢复单个的split特定部分。
常规容错方式有两种:
数据检查点
通过数据中心的网络连接各台机器,如果发生checkPoint的时候就需要复制数据,复制是要通过网络传输的,因此网络宽带是分布式的瓶颈,对存储的资源也是很大的消耗。
记录数据的更新
当有数据更新的时候,就需要记录数据,这种方式不需要复制数据集。
- RDD是不可变的且lazy的
- RDD的写操作是粗粒度的、读操作可以是粗粒度,也可以是细粒度。
3.Task失败会进行特定次数的重试
默认重试次数是4次。TaskSchedulimpl的源码如下:
def this(sc: SparkContext) = {
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}
private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)
4.Stage失败,会自动进行特定次数的重试
Stage可以跟踪多个StageInfo(存储SparkListener监听到的所有Stage信息,将Stage信息传递给Listeners或web UI)。重试默认次数是4次,且可以直接运行计算失败的阶段,只计算失败的数据分片,具体Stage源码如下:
private[spark] object DAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
// Number of consecutive stage attempts allowed before a stage is aborted
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}
5.checkpoint和persist(检查点和持久化),可以主动或被动触发
checkpoint是对RDD进行的标记,会产生一系列的文件,且所有父依赖都会被删除,是整个依赖的终点。checkpoint是lazy级别的。
persist后,RDD的每个分片会保存在内存或磁盘中,下一次使用相同RDD进行其他action计算的时候,就可以重用。
6.数据调度弹性、DAGSchedule、TaskSchedule调度和资源调度无关
spark讲执行模型抽象成有向无环图(Stage),各个Stage之间可以串行或这并行,从而不需要把Stage的中间结果输出到HDFS中,当节点发生故障时,其他节点可以替代该节点运行。
网友评论