美文网首页
Spark广播变量应用

Spark广播变量应用

作者: 扎西的德勒 | 来源:发表于2021-05-21 21:36 被阅读0次

    一、广播变量

    1、广播变量的优点

    不需要每个task带上一份变量副本,而是变成每个节点的executor存一份副本。这样的话, 就可以让变量产生的副本数量大大减少。

    2、广播变量的用法
    //将mapRdd广播后返回broadcastValue
    val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
    //获取广播变量的值
    val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
    
    3、广播变量的原理

    初始的时候,在Driver端有一个副本数据。广播变量后,task运行的时候,在使用副本数据前,首先在所在本地Executor对应的BlockManager中,尝试获取副本数据;如果本地没有,即从Driver端拉取副本数据,并且保存在所在本地的BlockManager中;此后这个Executor上所有的task,都会直接使用本地BlockManager中的副本数据。另Executor的BlockManager除了从Driver端拉取数据,也可能从其他节点的BlockManager中拉去副本数据。
    BlockManager:负责管理某个Executor对应的内存和磁盘的数据,尝试本地BlockManager中招map数据。

    4、优化说明

    假设有50个Executor,共1000个task;若每个map数据10M。默认情况下,1000个副本10M共10G数据。在集群中,通过网络传输,耗费10G的内存资源;如果使用了广播变量,50个Executor即50个副本10M共500M数据。而且Executor的BlockManager不一定都从Driver传输到本地,还可能从最近的节点的Executor的BlockManager中拉取数据,网络传输速度大大增加,传输数据大大减少。
    10G/500M=20倍,极大的提高了性能。

    二、代码实例

    1、准备数据
    //订单数据
    1001,20150710,p0001,2
    1002,20150710,p0002,3
    1002,20150710,p0003,3
    //产品数据
    p0001,xiaomi,1000,2
    p0002,appale,1000,3
    p0003,samsung,1000,4
    

    2、代码开发

    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkBroadCast {
      def main(args: Array[String]): Unit = {
        //构造Spark程序执行环境
        val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
        //如果集群运行,则不需要设置setMaster("local[*]")
        val sc = new SparkContext(conf)
        //设置日志级别
        sc.setLogLevel("WARN")
        //创建RDD,读取产品信息数据
        //产品记录样例:p0001,xiaomi,1000,2
        val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
        val productMapRdd: Array[(String, String)] = productRdd.map(x => {
          (x.split(",")(0), x)
        }).collect()
    //    productMapRdd.foreach(println)
        /**
         * (p0001,p0001,xiaomi,1000,2)
         * (p0002,p0002,appale,1000,3)
         * (p0003,p0003,samsung,1000,4)
         */
        //将产品数据作为广播变量
        val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)
    
        //读取订单记录:1001,20150710,p0001,2
        val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
        //将订单记录按照分区处理
        val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
          //获取产品广播变量的数据并转换为map类型,目的是通过getOrElse获取产品数据
          val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
          //处理分区内的订单数据记录
          val finalStr = eachPartition.map(eachLine => {
            //将每条订单记录按照逗号拆分,返回集合类型
            val ordersGet: Array[String] = eachLine.split(",")
            //产品的map类型,通过key(订单的产品id)获取对应的产品记录,返回产品数据记录
            val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
            //订单记录拼接产品记录
            eachLine + "\t" + getProductStr
          })
          finalStr
        })
        productAndOrderRdd.foreach(println)
    
        /**
         * 1001,20150710,p0001,2    p0001,xiaomi,1000,2
         * 1002,20150710,p0002,3    p0002,appale,1000,3
         * 1002,20150710,p0003,3    p0003,samsung,1000,4
         */
    
    
        //关闭Spark环境
        sc.stop()
      }
    
    }
    

    三、注意事项

    • 能不能将一个RDD使用广播变量广播出去?
      不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
    • 广播变量只能在Driver端定义,不能在Executor端定义。
    • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
    • 当Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
    • 当Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

    相关文章

      网友评论

          本文标题:Spark广播变量应用

          本文链接:https://www.haomeiwen.com/subject/dhfgjltx.html