美文网首页
数据分析实践 | flink | window函数处理篇

数据分析实践 | flink | window函数处理篇

作者: Sevsea | 来源:发表于2019-11-17 11:51 被阅读0次
    • 文章更新隔了太久,这其中自己也成长了很多。希望知识沉淀的更有深度。
    • 本文内容概述:细说flink中窗口处理函数的使用,只介绍一些自己使用过的,框架相同,函数太多了就不一个个介绍啦~

    0x01 从Aggregate开始

    接着上一篇,这是一个普通的aggregate+windowfunction的处理过程。

        val Pv_Bolt =
        Pv_Bolt_Before_result
          .keyBy(k=>k._1)
          .timeWindow(Time.minutes(1))
          .aggregate(new DummyAggregator(),new MyProcessWindowFunction()).name("Aggregate")
     
    

    这样的处理其实已经可以为没什么特殊需求的处理过程提供框架了,

    但是会在实际过程中遇到需要将数据打散之类的过程,例如windowfunction中处理打散数据之后如何再聚合回来?

    所以需要后续处理的过程。

    0x02 windowfunction后续处理

    接着上一个datastream做后续的处理,

    例如:这里举例统计某字段频率,每个Pv_Bolt的流的类型为Accumulator_Fre(需提前定义,可根据实际情况更换)

    
    case class Accumulator_Fre(
                                var url:String,
                                var str_Fre:Map[String,Int]//Map构成为Map[单词,单词出现数量]()
                               )
                                  
    ...                  
    //Pv_Bolt为已经做完数据打散处理的datastream
     val warning = Pv_Bolt
          .keyBy(k=>k.url)
          .timeWindow(Time.minutes(15))
          .reduce(
          (F1:Accumulator_Fre,F2:Accumulator_Fre)=> {
            var new_Fre = F1.str_Fre
            F2.str_Fre.foreach ( str_num => {
              if (F1.str_Fre.contains ( str_num._1 )) {
                new_Fre = new_Fre.updated ( str_num._1, F1.str_Fre.get ( str_num._1 ).get + str_num._2 )
              }
            } )
            new Accumulator_Fre ( F1.url, newip_Fre )
    
          },
          ( key: String,
            window:TimeWindow,
            #Context,
            input: Iterable[Accumulator_Fre],
            out: Collector[String])=>
          {
    
            var in = input.iterator.next ()
            val starttime = tranTimeToString(window.getStart.toString)
            val endtime = tranTimeToString(window.getEnd.toString)
            var str_Fre = in.str_Fre.filter(_._2>50).toSeq.sortBy(_._2).toMap //筛选规则
            if(str_Fre.size>0){
              content = starttime+"-"+endtime+" | " +in.url+"-> str_Fre : "+str_Fre+"\n\n"
              log.info(content)
              out.collect(content)
    
            }
            
          }
        ).name("reduce_window")
        .addSink(bk)//窗口内out结果外联处理,如kafka/redis等。
    ...
    

    相关文章

      网友评论

          本文标题:数据分析实践 | flink | window函数处理篇

          本文链接:https://www.haomeiwen.com/subject/uwrpictx.html