//1.1处理需要广播的数据
val tupleData = env.fromCollection(broadData)
val toBroadcastData = tupleData.map(tup=>{
Map(tup._1->tup._2)
})
val text = env.fromElements("zs","ls","ww")
val result = text.map(new RichMapFunction[String,String] {
//获取广播变量,坑
var listData: java.util.List[Map[String,Int]] = null
var allMap = Map[String,Int]()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//注意指定类型,坑
this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
val it = listData.iterator()
while (it.hasNext){
val next = it.next()
allMap = allMap.++(next)
}
}
override def map(value: String) = {
val age = allMap.get(value).get
value+","+age
}
}).withBroadcastSet(toBroadcastData,"broadcastMapName")
注意
- 广播变量是只读状态
- 广播状态中事件的顺序在各个并发实例中可能不尽相同,因此不能依赖广播数据得顺序
- 所有 operator task 都会快照下他们的广播状态
在 checkpoint 时,所有的 task 都会 checkpoint 下他们的广播状态,并不仅仅是其中一个,即使所有 task 在广播状态中存储的元素是一模一样的。这是一个设计倾向,为了避免在恢复期间从单个文件读取而造成热点。然而,随着并发度的增加,checkpoint 的大小也会随之增加,这里会存在一个并发因子 p 的权衡。Flink 保证了在恢复/扩缩容时不会出现重复数据和少数据。在以相同或更小并行度恢复时,每个 task 会读取其对应的检查点状态。在已更大并行度恢复时,每个 task 读取自己的状态,剩余的 task (p_new-p_old)会以循环方式(round-robin)读取检查点的状态。
- 广播状态目前在运行时保存在内存中,因为当前,RocksDB 状态后端还不适用于 operator state。Flink 用户应该相应地为其应用程序配置足够的内存。
网友评论