美文网首页
Spark中广播变量和累加器

Spark中广播变量和累加器

作者: weare_b646 | 来源:发表于2019-04-20 08:21 被阅读0次

    一、前述

    Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。

    累机器相当于统筹大变量,常用于计数,统计。

    二、具体原理

    1、广播变量

    • 广播变量理解图
    image
    • 注意事项

    1、能不能将一个RDD使用广播变量广播出去?

    不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

    2、 广播变量只能在Driver端定义,不能在Executor端定义。

    3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

    4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

    5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

    val conf = new SparkConf()
    conf.setMaster("local").setAppName("brocast")
    val sc = new SparkContext(conf)
       val list = List("hello","world")
        val broadCast = sc.broadcast(list)
        val lineRDD = sc.textFile("input/*")
        lineRDD.filter { x => {println(broadCast.value);broadCast.value.contains(x)} }.collect().foreach { println}
        sc.stop()
    
    

    2、累加器

    • 累加器理解图
    image image

    Scala代码:
    有问题

    object acculateDemo {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("sortDemo")
        val sc = new SparkContext(conf)
    
        var accumulator = 0
        sc.textFile("input/*",2).foreach {//两个变量
          x =>{accumulator += x.toInt
            println(accumulator)}}
        println(accumulator)
        sc.stop()
    
      }
    
    }
    
    

    正确

    import org.apache.spark.{SparkConf, SparkContext}
    
    object AccumulatorOperator {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("accumulator")
        val sc = new SparkContext(conf)
        val accumulator = sc.longAccumulator
        sc.textFile("./records.txt",2).foreach {//两个变量
          x =>{accumulator.add(1)
          println(accumulator)}}
        println(accumulator.value)
        sc.stop()
      }
    }
    
    

    结果:

    image
    package com.neusoft
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by ttc on 2018/10/17.
      */
    
    object acculateDemo {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local")
        val sc = new SparkContext(conf)
    
        val rdd = sc.textFile("/root/words.txt",10)
        val sum = sc.collectionAccumulator[String]
    
        rdd.map(x=>{
          sum.add(x)
        }).collect()
    
        println("sum is " + sum.value)
        sc.stop()
    
      }
    }
    
    

    注意事项

    累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

    相关文章

      网友评论

          本文标题:Spark中广播变量和累加器

          本文链接:https://www.haomeiwen.com/subject/nivjwqtx.html