- 文章更新隔了太久,这其中自己也成长了很多。希望知识沉淀的更有深度。
- 本文内容概述:细说flink中窗口处理函数的使用,只介绍一些自己使用过的,框架相同,函数太多了就不一个个介绍啦~
0x01 从Aggregate开始
接着上一篇,这是一个普通的aggregate+windowfunction的处理过程。
val Pv_Bolt =
Pv_Bolt_Before_result
.keyBy(k=>k._1)
.timeWindow(Time.minutes(1))
.aggregate(new DummyAggregator(),new MyProcessWindowFunction()).name("Aggregate")
这样的处理其实已经可以为没什么特殊需求的处理过程提供框架了,
但是会在实际过程中遇到需要将数据打散之类的过程,例如windowfunction中处理打散数据之后如何再聚合回来?
所以需要后续处理的过程。
0x02 windowfunction后续处理
接着上一个datastream做后续的处理,
例如:这里举例统计某字段频率,每个Pv_Bolt的流的类型为Accumulator_Fre(需提前定义,可根据实际情况更换)
case class Accumulator_Fre(
var url:String,
var str_Fre:Map[String,Int]//Map构成为Map[单词,单词出现数量]()
)
...
//Pv_Bolt为已经做完数据打散处理的datastream
val warning = Pv_Bolt
.keyBy(k=>k.url)
.timeWindow(Time.minutes(15))
.reduce(
(F1:Accumulator_Fre,F2:Accumulator_Fre)=> {
var new_Fre = F1.str_Fre
F2.str_Fre.foreach ( str_num => {
if (F1.str_Fre.contains ( str_num._1 )) {
new_Fre = new_Fre.updated ( str_num._1, F1.str_Fre.get ( str_num._1 ).get + str_num._2 )
}
} )
new Accumulator_Fre ( F1.url, newip_Fre )
},
( key: String,
window:TimeWindow,
#Context,
input: Iterable[Accumulator_Fre],
out: Collector[String])=>
{
var in = input.iterator.next ()
val starttime = tranTimeToString(window.getStart.toString)
val endtime = tranTimeToString(window.getEnd.toString)
var str_Fre = in.str_Fre.filter(_._2>50).toSeq.sortBy(_._2).toMap //筛选规则
if(str_Fre.size>0){
content = starttime+"-"+endtime+" | " +in.url+"-> str_Fre : "+str_Fre+"\n\n"
log.info(content)
out.collect(content)
}
}
).name("reduce_window")
.addSink(bk)//窗口内out结果外联处理,如kafka/redis等。
...
网友评论