美文网首页
spark-13-spark shell | webui | 内

spark-13-spark shell | webui | 内

作者: 西海岸虎皮猫大人 | 来源:发表于2020-10-01 19:37 被阅读0次

Application总共10 core的话,使用5个Executor每个2核要优于10个Executor每个1核,Executor启动需要时间

1 广播变量

当Executor中使用到了Driver的变量,不使用广播变量,有多少task就有多少变量副本;使用广播变量,每个Executor只有 一个副本

2 累加器

/**
  * 累加器 accumulator
  * 1.累加器只能在driver端定义 初始化
  * 2.累加器取值.value只能在driver端
  * 分布式没有全局变量的概念
  */
object PBroadcast {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    // 初始值可以是对象(涉及自定义累加器)或者字符串
    val accumulator: Accumulator[Int] = sc.accumulator(0)
    val rdd = sc.textFile("./words")
    // 使用变量统计行数
//    var i = 0
    rdd.map(s=>{
      // executor端执行
      // i不回收到driver
//      i += 1
      accumulator.add(1)
      // 分区中取值无意义
//      println(accumulator)
      s
    }).collect()
    // i为driver端
//    println(i)
    println(accumulator.value)
  }
}

3 SparkShell

# 启动
./spark-shell --master spark://node-01:7877
# Spark context available as sc.
sc.textFile("hdfs://node-01:9820/spark/data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()

# 指定日志目录
./spark-shell --master spark://node-01:7877 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://node-01:8020/spark/log

# 配置文件中修改
cp spark-defaults.conf.template spark-defaults.conf
vi spark-defaults.conf
--------------
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://node-01:8020/spark/log
# 配置历史日志
spark.history.fs.logDirectory    hdfs://node-01:8020/spark/log
# 日志压缩
spark.eventLog.compress true
--------------
# node-03启动日志服务器
./start-history-server.sh

4 shuffle

reduceByKey把每个key的value聚合,结果每个key对应一个value

shuffle read

上个stage相同的key写入同一分区文件,分区数取决于reduce task的并行度

shuffle write

reduce task寻找属于自己的分区文件

hash shuffle

map task按key的hash值写入buffer缓冲区,buffer 32k,每个buffer溢写磁盘小文件,shuffle完才聚合
文件数 = M * R
文件多 -> 频繁GC OOM 跨节点连接断掉几率增大

优化

每个core的executor共用buffer
文件数 = C * R

sort shuffle

为了解决hash shuffle小文件多的问题而引入
文件数 = 2 * M

sort shuffle by pass

不排序

5 shuffle寻址

image.png
MapOutputTracker

管理磁盘小文件,主(Driver)从(Executor)关系

BlockManager

管理块,主从

BlockManagerMaster(Driver)

  • DiskStore 管理磁盘
  • MemoryStore 管理内存
  • ConnectionManager 连接其他BlockManager
  • BlockTransferService 拉取数据

BlockManagerWorker(Executor)

  • DiskStore 管理磁盘
  • MemoryStore 管理内存
  • ConnectionManager 连接其他BlockManager
  • BlockTransferService 拉取数据

6 内存管理

静态内存管理

spark1.6之前是静态内存管理


静态内存管理

shuffle第一次内存会oom
shuffle第二次内存不够不会oom,会溢写磁盘

解决oom

  1. 减少数据拉取
  2. 增大executor总内存
统一内存管理
统一内存管理

shuffle和持久化内存可以互相借用
spark1.6默认统一内存管理

6 shuffle调优

建议提交任务时设置

增大shuffle buffer
增大read拉取的buffer
增加拉取重试次数
加长重试等待间隔
设置shuffle机制 hash | sort,是否排序(by pass)
hash shuffle合并文件

7 master ha

两种方式: FileSystem ZK
master挂掉不影响已经运行的task,但不能再提交任务
FileSystem需要手动切换恢复,推荐ZK

# node-01修改配置 conf/spark-env.sh
---------
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node-02:2181,node-03:2181,node-04:2181 -Dspark.deploy.zookeeper.dir=/var/spark"
---------
# 配置发送到node-02 node-03

# node-02配置master地址 conf/spark-env.sh
------------
# master地址
export SPARK_MASTER_IP=node-02
------------

# node-01启动集群
# node-02单独启动master
./sbin/start-master.sh 
# kill node-01的master可以看到node-02由standby变为active

# ha提交任务时需要指定两台机器
./spark-submit --master spark://node-01:7877,node-02:7877 ...
# master切换不影响已经运行的task
# client模式连不上master会自动切换

相关文章

网友评论

      本文标题:spark-13-spark shell | webui | 内

      本文链接:https://www.haomeiwen.com/subject/qccmuktx.html