1 广播变量
object BroadCastData {
// Map(id, name)
var idNameMap: Broadcast[scala.collection.mutable.Map[String, String]] = null
def queryIdName(sparkContext: SparkContext, prop: Map[String, String]): Broadcast[scala.collection.mutable.Map[String, String]] = {
var map: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map()
val connection: Connection = MysqlUtil.getConnection(prop)
val sql: String = "SELECT id, name FROM demo"
MysqlUtil.execQuery(connection, prop, sql, (rs: ResultSet) => {
map += (rs.getString("id") -> rs.getString("name"))
})
if (null != idNameMap) {
idNameMap.unpersist()
idNameMap= null
}
synchronized {
idNameMap= sparkContext.broadcast(map)
}
MysqlUtil.releaseResource(connection)
return idNameMap
}
}
2 使用
stream.foreachRDD(rdd => {
// 从广播变量中获取值
val idNameMap: mutable.Map[String, String] = BroadCastData.queryIdName(rdd.sparkContext, prop).value
rdd.flatMap(log => {
convert2StatLog(log, idNameMap)
})
}
网友评论