美文网首页
Spark 之广播变量

Spark 之广播变量

作者: xiaoc024 | 来源:发表于2020-09-21 20:53 被阅读0次

    1. Background

    Spark 中有两种共享变量,其中一个是累加器,另一个是广播变量。前者解决了 Spark 闭包导致的局限性,如果不使用则会造成错误。后者更多时候是一种调优手段。了解广播变量的使用很多时候可以帮助我们提高程序的运行效率。

    2. Basic

    2.1 什么是广播变量?

    以下文字摘自官网:

    Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

    通过以上描述,总结出两个要点:

    • read-only variable each machine,not each tasks
    • effcient broadcast algorithms

    each machine,not each tasks 正是广播变量作为调优手段的本质原因。这样会节省很多网络 I/O。而广播变量是只读的原因也很好理解,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。而所谓的高效率广播算法指的是 TorrentBroadcast 使用的类似 BitTorrent 的技术,即种子 or P2P 下载技术。

    2.2 如何使用广播变量?

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
    
    scala> broadcastVar.value
    res0: Array[Int] = Array(1, 2, 3)
    

    广播变量使用很简单,首先通过 SparkContext 发送广播变量,注意这里不能直接发送 RDD。然后在需要取值的地方读取 value 的值即可。

    2.3 使用场景

    • 多个 stage 公用数据,如配置文件

    • 单个 Executor 运行很多个 task

    • 数据需要被保存为非序列化的方式

    2.4 BlockManager

    广播变量底层的存储使用的 Spark 中一个核心组件 BlockManager,这里简要介绍一下。BlockManager 是一种和 HDFS类似,基于 master-slavers 架构的 kv 分布式存储,底层使用 netty 作为传输框架,数据读写和 hdfs 十分类似。

    3. Deep

    3.1 @transient lazy val 模式

    当使用 @transient lazy val 修饰一个变量是,意味着两点:1. 该变量所在类在序列化时,忽略该变量。 2.只有第一次调用该变量时,才会进行初始化。Broadcast 中的 value 便使用了这种模式,使得 broadcast 在序列化从 driver 传递到 executor 时,并不会序列化 value 。只有当第一次真正访问时才会通过 blockManager 机制进行初始化并读取。

    3.2 Broadcast 实现机制

    Driver 先建一个本地文件夹用以存放需要 broadcast 的 data,并启动一个可以访问该文件夹的 HttpServer。当调用val bdata = sc.broadcast(data)时就把 data 写入文件夹,同时写入 driver 自己的 blockManger 中(StorageLevel 为内存+磁盘),获得一个 blockId,类型为 BroadcastBlockId。当调用rdd.transformation(func)时,如果 func 用到了 bdata,那么 driver submitTask() 的时候会将 bdata 一同 func 进行序列化得到 serialized task,注意序列化的时候不会序列化 bdata 中包含的 data(_value变量)。
    然后是 bdata 的序列化过程:Driver 先把 bdata 序列化到 byteArray,然后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB 设置)大小的 data block,同时会通知 driver 自己的 blockManagerMaster 说 meta 信息已经存放好。在代码中体现为 tellMaster = true。executor 收到 serialized task 后,先反序列化 task,这时候会反序列化 serialized task 中包含的 bdata 类型是 TorrentBroadcast,也就是去调用 TorrentBroadcast.readObject()。这个方法首先得到 bdata 对象,然后发现 bdata 里面没有包含实际的 data。怎么办?先询问所在的 executor 里的 blockManager 是会否包含 data(通过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。否则,就通过本地 blockManager 去连接 driver 的 blockManagerMaster 获取 data 分块的 meta 信息,获取信息后,就开始了 BT 过程。

    相关文章

      网友评论

          本文标题:Spark 之广播变量

          本文链接:https://www.haomeiwen.com/subject/yiebyktx.html