spark 比较难得一个事情之一就是当在集群上执行代码的时候,变量和方法的范围和生命周期。Rdds 的操作 能够修改他们外部的变量这是一个持续混淆的原因。在下面的例子中,我们来看下使用foreach 方法来增加计数器代码,当然其他的一些操作也有类似的问题。
Example
思考如下一个简单的RDD元素的求和的问题,这个问题会根据你的执行是否在同一个JVM内而不同。一个常见的例子就是 运行spark在 local 模式的话(--master = local)和在一个集群上运行一个Spark的应用(比如 通过spark-submit 提交到YARN)
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Local vs. cluster 模式
以上代码的行为是不确定的,而且可能执行起来和你想的不同。为了执行job,Spark 将Rdd的操作切分成很多task。这些task 是被每一个executor执行的。在执行前,Spark会计算task的闭包。闭包就是那些对于executor可见的并且可以再RDD上进行计算的变量和方法。闭包是可以序列化并且被发送到每一个excutor上的。
被发送到executor上的闭包内的变量是拷贝的,因而,当couner 在 foreach 内的被引用的时候,它已经不是driver 节点上的counter了。 driver 节点上的counter 依然存在于内存中,但是已经对于executors 不可见了。这些executors 只能看到来自序列化的闭包的副本。因而,最终 counter的值将会一直是0,因为所有counter的操作都是引用的闭包内的值。
为了保证这些场景下 明确的行为,应该使用一个 Accumulator
.
网友评论