第十五章 Spark如何在集群上运行
Spark应用程序的体系结构
- Spark驱动器:控制你应用程序的进程。它负责控制整个Spark应用程序的执行并维护着Spark集群的状态
- Spark执行器:一个进程,负责执行由Spark驱动器分配的任务
-
集群管理器:集群管理器负责维护一组运行Spark应用程序的机器,集群管理器管理物理机,而不是进程。集群有自己的driver和worker,要和spark的区分开。如下图是一个集群的示例
集群示例
执行模式
集群模式
集群管理器负责维护所有与Spark应用程序相关的进程,集群的某个worker上会有spark的驱动器
黄点是集群的driver,实线的黄色方框是Spark的驱动器
Spark集群模式
客户端模式
Spark的驱动器保留在提交应用程序的客户端及其上,客户端机器维护Spark驱动器进程
Spark客户端模式
本地模式
在一台机器上运行整个Spark应用程序,通过单机的线程实现并行,不是生产级的运用
Spark应用程序的生命周期(Spark外部)
-
客户端提交一个应用程序
提交程序 -
启动,初始SparkSession,SparkSession会与集群管理器驱动节点通信,然后在由它在集群上启动执行器
启动 -
执行
执行Spark程序 -
完成,由集群管理器关闭执行器
完成
Spark应用程序的生命周期(Spark内部)
- 每个应用程序由一个或多个Spark作业组成,应用程序内的一系列Spark作业是串行执行的(除非设置了多线程)
- 所有Spark任务的第一步都是创建SparkSession
- Spark作业会根据具体有几次shuffle操作划分阶段
- 一次shuffle操作意味着对数据的物理重分区,每次shuffle之后开始一个新阶段
- 分区数量应当大于集群上执行器的数量
- 每个任务都对应于一组数据和一组将在单个执行器上运行的转换操作
- 任务是应用于每个数据单元(分区)的计算单位
- 如果不需要跨节点的数据移动,就可以将这一系列操作合并为一个单独的任务阶段,不用每一次都写内存、磁盘
- Spark执行shuffle操作时,总是首先让前一阶段的源任务将要发送的数据写入到本地磁盘的shuffle文件上。这有利于任务的调度、容错,重新执行时已经shuffle好的数据也不会再执行
第十八章 监控与调试
- 要监控两个方面:运行应用程序的进程信息、查询执行过程
Spark UI
SQL Tab
Spark UI SQL tab每一个蓝色框代表Spark任务的一个阶段,所有这些阶段都代表一个Spark作业
- Scan是读入数据
- Exchange一般是shuffle分区
- Project(选择、添加、过滤列)
- HashAggregate是聚合操作
- Join可以是SortMergeJoin BroadcastJoin,后者用于大小表连接
其他Tab
- Storage 显示有关集群上缓存的RDD/DataFrame信息
- Environment显示运行环境的信息
调试和Spark抢救方案
Spark作业未启动
- 可能是因为没有正确打开某个指定端口
- 应用程序为每个执行器进程请求了过多资源
执行期间错误
- 输入数据格式是否正确
- 查询中的列名拼写、列(表、视图)不存在
- 使用一定能正确运行的数据集隔离检查问题
任务运行缓慢
- 增加分区数
- 重新分配分区
- 分配更多内存
- 定位是否存在不健康节点
- 检查UDF UDAF的使用
- 开启推测执行
缓慢的聚合操作
- 增加分区数量
- 增加内存
- 确保过滤和映射操作在聚合之前完成
- 空值、热点值特殊处理
- 一些聚合操作中间值无法节省空间,比如collect_list
缓慢的链接操作
- 优先执行过滤
- 连接前进行分区,不过分区是要shuffle的
- 数据倾斜的处理
缓慢的读写操作
- 开启推测执行
- 确保网络状态
- 如果在相同节点上运行Spark与HDFS等分布式文件系统,确保Spark与文件系统节点主机名相同
驱动器OOM或无响应
- 可能尝试了collect等操作将过大的数据集收集到驱动器节点
- 广播的数据量太大
- 应用程序长时间运行导致驱动器进程生成大量对象,并且无法释放它们
- 如果有和Python等语言绑定,数据序列化反序列化需要占用JVM大量内存
执行器OOM或无响应
- 检查GC情况
- 增大内存
- 处理空值
第十九章 性能调优
- 依赖于网络性能的shuffle操作往往是Spark作业中开销最大的一个步骤
- 两种方法可以指定Spark作业之外的执行特性:一是合理配置改变运行环境;二是指定某个Spark作业、某个阶段、某个任务的执行特性
间接性能优化
设计选择
- 在所有语言环境下,DataFrame、Dataset和SQL在速度上都是相同的。如果使用RDD推荐使用Java和Scala,因为Python运行时大量数据将被序列化到Python进程或从Python进程序列化出来
集群配置
- 开启动态分配,资源不再适用时资源返回给集群
调度
- 开启并行度设置,公平分配资源
静息数据(数据的存储)
- 选择Parquet等格式存储文件
- 选择可拆分的压缩方式
- 表进行分区与分桶
- 注意小文件问题
- 数据局部性,哪些节点存哪些数据,进而减少网络交换
- 成本优化器设置
内存压力和垃圾收集
- 当应用程序在执行过程中占用过多内存,或者垃圾回收运行过于频繁,或者在JVM中创建了大量对象,而垃圾回收机制没有及时回收,就会产生内存压力大的情况。
- 尽可能使用结构化API,他们不会生成JVM对象
- 如果任务完成前多次调用完整的垃圾回收,应该减少Spark用于缓存的内存大小(Spark.Memory.fraction)
直接性能优化
并行度
- spark.default.parallelism
过滤优化
- 将过滤器尽可能地提前
- 进行必要的重分区与合并后
- 自定义分区
UDF
- 避免使用UDF是一个很好的优化策略,因为他们强制将数据表示为JVM中的对象
缓存
- 对于频繁访问的数据,使用cache方法缓存数据集
连接
- 尽可能进行等值连接
- 让过滤发生在内部连接中
聚合
- 使用reduceByKey而不是groupByKey
网友评论