美文网首页
使用广播变量

使用广播变量

作者: Jorvi | 来源:发表于2019-12-10 10:08 被阅读0次

1 广播变量

object BroadCastData {

  // Map(id, name)
  var idNameMap: Broadcast[scala.collection.mutable.Map[String, String]] = null


  def queryIdName(sparkContext: SparkContext, prop: Map[String, String]): Broadcast[scala.collection.mutable.Map[String, String]] = {
    var map: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map()

    val connection: Connection = MysqlUtil.getConnection(prop)
    val sql: String = "SELECT id, name FROM demo"
    MysqlUtil.execQuery(connection, prop, sql, (rs: ResultSet) => {
      map += (rs.getString("id") -> rs.getString("name"))
    })

    if (null != idNameMap) {
      idNameMap.unpersist()
      idNameMap= null
    }

    synchronized {
      idNameMap= sparkContext.broadcast(map)
    }

    MysqlUtil.releaseResource(connection)

    return idNameMap
  }
}

2 使用

  stream.foreachRDD(rdd => {
    // 从广播变量中获取值
    val idNameMap: mutable.Map[String, String] = BroadCastData.queryIdName(rdd.sparkContext, prop).value

    rdd.flatMap(log => {
      convert2StatLog(log, idNameMap)
    })
  }

相关文章

  • 使用广播变量

    1 广播变量 2 使用

  • Spark的广播变量机制

    Spark广播变量 什么是广播变量? 在同一个Execute共享同一份计算逻辑的变量 广播变量使用场景 我现在要在...

  • spark使用广播变量

  • 广播变量和累加器

    1.广播变量 1.不使用广播变量的话,Driver每次发送task到executor端去执行,都会携带数据副本,这...

  • Spark中的共享变量---广播变量和累加器

    一.广播变量和累加器的作用累加器(集群规模之间的大变量):做Spark的全局统计使用广播变量(集群规模间的大常量)...

  • 广播变量

    从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。

  • Spark原理图

    为什么使用广播变量因为一个变量在Driver端定义,如果执行计算需要传递到executor的task线程中获取变量...

  • 共享变量:广播变量

    一、使用场景如果我们要在分布式计算里面分发大对象(如:字典,集合,黑白名单等),由Driver端进行分发。如果这个...

  • flink 广播变量

    使用 注意 广播变量是只读状态 广播状态中事件的顺序在各个并发实例中可能不尽相同,因此不能依赖广播数据得顺序 所有...

  • Spark之广播变量

    什么是广播变量 广播变量:分布式共享只读变量。广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,...

网友评论

      本文标题:使用广播变量

      本文链接:https://www.haomeiwen.com/subject/vrmwgctx.html