美文网首页
使用广播变量

使用广播变量

作者: Jorvi | 来源:发表于2019-12-10 10:08 被阅读0次

    1 广播变量

    object BroadCastData {
    
      // Map(id, name)
      var idNameMap: Broadcast[scala.collection.mutable.Map[String, String]] = null
    
    
      def queryIdName(sparkContext: SparkContext, prop: Map[String, String]): Broadcast[scala.collection.mutable.Map[String, String]] = {
        var map: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map()
    
        val connection: Connection = MysqlUtil.getConnection(prop)
        val sql: String = "SELECT id, name FROM demo"
        MysqlUtil.execQuery(connection, prop, sql, (rs: ResultSet) => {
          map += (rs.getString("id") -> rs.getString("name"))
        })
    
        if (null != idNameMap) {
          idNameMap.unpersist()
          idNameMap= null
        }
    
        synchronized {
          idNameMap= sparkContext.broadcast(map)
        }
    
        MysqlUtil.releaseResource(connection)
    
        return idNameMap
      }
    }
    

    2 使用

      stream.foreachRDD(rdd => {
        // 从广播变量中获取值
        val idNameMap: mutable.Map[String, String] = BroadCastData.queryIdName(rdd.sparkContext, prop).value
    
        rdd.flatMap(log => {
          convert2StatLog(log, idNameMap)
        })
      }
    
    

    相关文章

      网友评论

          本文标题:使用广播变量

          本文链接:https://www.haomeiwen.com/subject/vrmwgctx.html