美文网首页
Spark 共享变量

Spark 共享变量

作者: wangdy12 | 来源:发表于2017-11-04 15:13 被阅读0次

通常,当把一个函数传递给Spark的操作时(例如mapreduce操作),函数可以使用在驱动程序中定义的变量,但是函数在不同的节点上执行,每个节点都需要对函数(闭包)内的每个变量进行拷贝发送,而在远程机器上变量的更新不会传播回到驱动程序。

为此,Spark提供了特殊类型的共享变量:广播变量(broadcast variables),在每个节点上的内存上都缓存一个值,累加器(accumulators),工作节点只能add,可以用于计数和求和等

Broadcast Variables

广播变量让程序更高效的发送大型的只读数据集,缓存到每个机器上,方便进行多次使用,不需要在每个task中都包含一份复制

Spark actions 是分阶段执行的,之间以shuffle操作划分。 Spark会自动广播同一阶段的任务所需的公共数据,对应的数据以序列化形式缓存,在运行任务之前反序列化。因此显式创建广播变量仅在跨多个阶段Stage的任务需要相同数据才需要,例如可以用来广播比较大的查找表,或者机器学习算法的特征向量

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

广播机制底层使用Orchestra,类似BitTorrent传输

具体实现类为TorrentBroadcast,driver端将串行化的数据切分成多个块,由driver端的BlockManager负责存储,在每个executor上,优先从本地BlockManager上读取,其次从driver端或者其他executor上远程获取数据。它会将获取到的数据放置在自己的BlockManager内,方便其他executor获取数据,避免了driver端同时发送多个广播变量的拷贝

Accumulators

用来聚合工作节点上的数值到驱动程序,用来计数求和等
Spark本身支持数字类型的累加器,例如DoubleAccumulatorLongAccumulatorCollectionAccumulator
AccumulatorV2是累加器的基类,可以通过继承添加对自定义类型的支持,子类需要实现其中的抽象方法,例如addmerge,使用自定义的累加器时要记得进行注册,因为内置的累加器在SparkContext创建时默认进行了注册

doubleAccumulator: DoubleAccumulator//创建一个没有名称的double类型累加器
longAccumulator(name: String): LongAccumulator//创建一个带名称的long类型累加器

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

  • 工作节点不能访问累加器的值value, 只能进行写操作add
  • 因为容错机制,部分节点可能需要重新计算,Spark确保action操作中的累加器,只会被更新一次,例如foreach,在转换操作中,一个任务可能因为重复执行多次更新累加器
  • driver端应该在action操作后再访问累加器的值,因为transformation操作是惰性求值,不会主动执行

相关文章

  • Spark-broadcast

    参见Spark相关--共享变量-广播变量-broadcast

  • 7.spark共享变量

    spark共享变量 1 Why Apache Spark2 关于Apache Spark3 如何安装Apache ...

  • Spark中使用Scala synchronized 并发加锁创

    摘要:Spark,Scala,synchronized executor端共享变量 Spark的rdd调用map或...

  • Spark 共享变量

    翻译 Spark 共享变量部分的官方文档(Spark 2.4.3)。 通常,当传递给 Spark 操作 (如 ma...

  • spark 共享变量

    关于累计器, 广播变量, 参考:http://blog.csdn.net/u013468917/article/d...

  • Spark 共享变量

    默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个tas...

  • Spark 共享变量

    通常,当把一个函数传递给Spark的操作时(例如map或reduce操作),函数可以使用在驱动程序中定义的变量,但...

  • Spark共享变量

    共享变量分类 共享变量官网解释 Normally, when a function passed to a Spa...

  • spark2原理分析-广播变量(Broadcast Variab

    概述本文介绍spark中Broadcast Variables的实现原理。 基本概念在spark中广播变量属于共享...

  • spark之广播变量&累加器

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。累加器...

网友评论

      本文标题:Spark 共享变量

      本文链接:https://www.haomeiwen.com/subject/rvhopxtx.html