hadoop和spark比较
Hadoop的局限性
- 处理效率低
1、map的中间结果写磁盘,reduce写hdfs,多个mr之间的数据通过hdfs交换数据
2、任务调度和启动开销过大
3、无法充分利用内存
4、map端和reduce端均需要排序 - 不适合迭代计算(如机器学习,图计算,)交互式处理(数据挖掘)和流式处理(点击日志查询分析)
- hadoop 编程不够灵活
1、仅支持map和reduce两种操作
2、尝试函数式编程风格
saprk的特点
- 高效
1、内存式计算引擎,提供cashe机制来避免反复计算,减少io开销
2、DAG (directed acyclic graph、有向无环图) 引擎,减少多次计算之间中间结果写到hdfs的开销
3、使用多线程池模型来减少task启动的开销,shuffle过程中避免不必要的sort操作以及减少io的操作 - 易用
1、提供了丰富的api,支持四种语言(java scala python,r)
2、代码量少 - 与hadoop集成
1、读写hdfs、hbase
2、运行与yarn上
spark 名词解释
DAG
- 有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。
RDD
- resilient distributed datasets 弹性分布式数据集(物理不存在,只是一个逻辑概念)
Driver
- 程序入口(main函数执行的地方)
- 程序运行完成之后,汇总计算结果
- driver和executor之间用心跳来维持(executor 汇报运行状态和统计信息)
Partition
- partition是spark rdd计算的最小单元。为什么是最小单元?先从分布式说起,分布式计算的特点就是批处理,将大量的数据分成若干批次,使得利用廉价机器搭建的集群也可以完成海量数据的计算。大量的数据分散在集群中的若干节点上,每个节点上的那部分数据,在执行计算的时候,又可以切分成若干份,每一份就是一个批次,也就是一个partition。(一个节点上存在多个partition)
- partition数量的决定
1、从内存中创建
sc.parallelize(...)这种方式创建的partition数量等于SparkContext的defaultParallelism的值,或者等于传给parallelize的numSlices参数。
2、读取HDFS文件创建
包括使用sc.textFile(...)或spark.sql(...)查询Hive表得到的结果的partition数量等于HDFS中的文件块数。
3、转化过程中partition的数量
filter(), map(), flatMap(), distinct()partition数量等于parent RDD的数量。
rdd.union(other_rdd)partition数量等于rdd_size + other_rdd_size
rdd.intersection(other_rdd) partition数量等于max(rdd_size, other_rdd_size)
rdd.subtract(other_rdd) partition数量等于rdd_size
rdd.cartesian(other_rdd) partition数量等于rdd_size * other_rdd_size
Excutor
-
Executor是spark任务(task)的执行单元,运行在worker上,但是不等同于worker,实际上它是一组计算资源(cpu核心、memory)的集合。一个worker上的memory、cpu由多个executor共同分摊。
-
executor主要参数示例
1、spark.executor.cores:顾名思义这个参数是用来指定executor的cpu内核个数,分配更多的内核意味着executor并发能力越强,能够同时执行更多的task
2、spark.cores.max :为一个application分配的最大cpu核心数,如果没有设置这个值默认为spark.deploy.defaultCores
3、spark.executor.memory:配置executor内存大小 -
总结:executor 数量 = spark.cores.max/spark.executor.cores
spark.cores.max 是指你的spark程序需要的总核数
spark.executor.cores 是指每个executor需要的核数
Task
- spark 的task 分为两种(shuffleMapTask和resultTask)
- 指定并行的task数量
spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
- 小结(个人理解)
1、在我看来,task 任务是在driver端产生,由driver生成task后,主动发送给executor;(因为之前和别的同学讨论过task是executor拉的还是driver推的问题,也不知道提交task是不是由下面说的taskScheduler来执行的)
2、如果要讨论partition和task的数量问题,在一个stage(阶段中),一个partition将有一个task完成(可以说task和partition是对应的),不同的stage中的task种类有可能不一样(task由shufflemaptask和resulttask组成)
3、driver提交任务后cluter manege 将task提交到executor后,task中包含(具体任务和运行任务所需要的依赖)executor首先接受到命令后首先根据task携带的依赖下载依赖,依赖下载完成后,准备就绪后开始执行task;
Stage
- 要想即使stage(阶段)是干啥的,必须先弄清楚一个概念-scheduler(任务调度)
DAGScheduler 分析用户提交的应用,根据依赖关系建立DAG,然后将DAG划分为不同的stage(阶段),其中每个stage由并发执行的一组task构成,这些task执行的逻辑完全一样,只是数据不一样而已; - stage的开始和结束
一个stage的开始就是从外部储存或者shuffle结果中读取数据,一个stage的结束就是由于发生了shuffle或者生成结果时;
Scheduler
- spark的任务调度模块分为两大类:DAGScheduler和TaskScheduler
1、DAGScheduler:负责将用户提交的计算任务按照DAG划分为不同的阶段;
2、TaskScheduler:将计算任务提交到集群进行计算
备注(除此之外还有一个ScheduleBackend,作用是分配资源的)
Job
- job来源
原始的rdd经过一系列转换在最后一个action会触发一个动作,这个动作会生成一个job; - job提交
DAGScheduler将job 提交由TaskSCheduler最后提交至executor计算运行
Shuffle
-
什么是Shuffle?为什么要Shuffle?
shuffle (洗牌),目的是为了使具有某种共同风格的数据重新汇聚到一个计算节点上; -
shuffle是会进行磁盘读写的,因为shuffle是将几百甚至几千机器上的数据汇聚到一个节点或者partition上,内存有可能装不下,这个过程会发生多次硬盘续写;
-
首先,哪种rdd间的操作会需要shuffle呢?
这取决于该rdd和它的所依赖的rdd 的关系(rdd和它依赖的parent rdd 的关系有两种不同的类型:窄依赖和宽依赖)
-----------------很烦呀,shuffle没弄明白又出来一个宽依赖(wide dependency)和窄依赖(narrow dependency).....
1、窄依赖(narrow dependency):由于rdd的每个partition依赖固定数量的parent rdd 的partition,因此可以用一个task来处理这些partition,这些partition相互独立,所以这些task 可以并行计算
2、宽依赖(wide dependency):由于需要shuffle ,因此所有的 rdd 的parent partition shuffle完成之后,新的partition 才会出现,这样接下来的task才能被处理,因此,宽依赖可以认为是dag的分割线,或者说spark根据宽依赖将job划分为不同的阶段(stage)
BlockManage
- BlockManager 是管理整个Spark运行时的数据读写的,也包含数据存储本身,进行读写操作,(由于 Spark 本身是分布式的,所以 BlockManager 也是分布式的,也包含缓存)
网友评论