美文网首页
第七章 处理函数

第七章 处理函数

作者: 井底蛙蛙呱呱呱 | 来源:发表于2022-12-17 19:54 被阅读0次

    Flink本身提供了多层API,前面介绍的DataStream API只是其中的一环。


    在前面的章节介绍了诸多Flink提供的算子(如map、filter、widow等)。除了使用这些已有的算子,我们也可以自定义算子,也即本章的处理函数(process function)。

    基本处理函数(ProcessFunction)

    之前讲的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限,如map算子,只能获取到当前的数据。而窗口聚合算子AggregateFuntion以及富函数(如RichMapFunction),可以拿到上下文状态等信息。但是无论那种算子,想要访问时间戳,或者当前水位线信息,都是完全做不到的,这时就需要处理函数(ProcessFunction)。

    处理函数提供了一个“定时服务”(TimeService),我们可以通过它访问流中的时间(event),时间戳(timestamp),水位线(Watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数汉可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个DataStream API的最底层基础。

    处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。stream.process(new MyProcessFunction)

    一个梨子:

    package com.whu.chapter07
    
    import com.whu.chapter05.{ClickSource, Event}
    import org.apache.flink.streaming.api.functions._
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    object ProcessFunctionDemo {
      def main(args:Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        env.addSource(new ClickSource())
          .assignAscendingTimestamps(_.timeStamp)
          .process(new ProcessFunction[Event, String] {
            // 每来一条元素都会调用一次
            override def processElement(i:Event, context:ProcessFunction[Event,String]#Context, collector: Collector[String]):Unit = {
              if(i.user.equals("Mary")){
                // 向下游发送数据
                collector.collect(i.user)
              }else if(i.user.equals("bob")){
                collector.collect(i.user)
              }
              // 打印当前水位线
              println(context.timerService.currentWatermark())
            }
          })
          .print()
    
        env.execute()
      }
    }
    

    ProcessFunction解析:

    public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
      ...
      public abstract void processElement(I value, Context ctx, Collector<O> out) 
    throws Exception;
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
    throws Exception {}
      ...
    }
    

    processElement方法用于处理原始,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及收集器(Collector)out。方法没有返回值,处理之后的输出数据通过收集器的out定义。

    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致;
    • ctx: 类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法output();
    • out: “收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap()算子中的收集器完全一样,直接调用out.collect()方法就可以像下游发出一个数据。这个方法可多次调用,也可以不调用。

    onTimer用于定时触发的操作,这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

    在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。

    处理函数的分类

    Flink提供了8个不同的处理函数:

    • ProcessFunction,最基本的处理函数,基于 DataStream 直接调用 process()时作为参数传入;
    • KeyedProcessFunction,对流按键分区后的处理函数,基于 KeyedStream 调用 process()时作为参数传入。要想使用定时器,必须基于 KeyedStream;
    • ProcessWindowFunction,开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process()时作为参数传入;
    • ProcessAllWindowFunction,同样是开窗之后的处理函数,基于 AllWindowedStream 调用 process()时作为参数传入;
    • CoProcessFunction,合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process()时作为参数传入;
    • ProcessJoinFunction,间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process()时作为参数传入;
    • BroadcastProcessFunction,广播连接流处理函数,基于 BroadcastConnectedStream 调用 process()时作为参数传入;
    • KeyedBroadcastProcessFunction,按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用 process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream
      与广播流(BroadcastStream)做连接之后的产物。

    按键分区处理函数(KeyedProcessFunction)

    在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。

    package com.whu.chapter07
    
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import com.whu.chapter05.{ClickSource, Event}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    import java.sql.Timestamp
    
    object KeyedProcessFunctionDemo {
      def main(args: Array[String]):Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        env.addSource(new ClickSource())
          .keyBy(r=>true)
          .process(new KeyedProcessFunction[Boolean, Event, String]{
            override def processElement(i: Event, context: KeyedProcessFunction[Boolean, Event, String]#Context, collector: Collector[String]): Unit = {
              val currTs = context.timerService.currentProcessingTime()
              collector.collect("数据到达时间:"+new Timestamp(currTs))
              // 注册10秒钟之后的处理时间定时器
              context.timerService().registerProcessingTimeTimer(currTs+10*1000L)
            }
    
            // 定时器逻辑
            override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
              out.collect("定时器触发时间:"+new Timestamp(timestamp))
            }
          }).print()
    
        env.execute()
      }
    }
    

    窗口处理函数

    窗 口 处 理 函 数 ProcessWindowFunction 的 使 用 与 其 他 窗 口 函 数 类 似 , 也 是 基 于WindowedStream 直接调用方法就可以,只不过这时调用的是 process()。

    ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:

    public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    ...
      public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
      public void clear(Context context) throws Exception {}
      public abstract class Context implements java.io.Serializable {...}
    }
    

    其有四个类型的参数:

    • IN: input, 数据流中窗口任务的输入数据类型;
    • OUT: output, 窗口任务进行计算之后的输出数据类型;
    • KEY: 数据中key的类型;
    • W:窗口的类型,是window的子类型,一般情况下我们定义时间窗口,w就是TimeWindow;

    由于全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是 processElement(),
    而是改成了 process()。方法包含四个参数:

    • key:窗口做统计计算基于的键,也就是之前keyBy()用来区分的字段;
    • context:当前窗口进行计算的上下文,他的类型就是ProcessWindowFunction内部定义的抽象类context;
    • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型;
    • out:用来发送数据输出计算结果的收集器,类型为Collector;

    一个计算网页浏览量topN的例子:

    package com.whu.chapter07
    
    
    import org.apache.flink.streaming.api.functions._
    import com.whu.chapter05.{ClickSource, Event}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import scala.collection.JavaConverters._
    import java.sql.Timestamp
    import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
    import scala.collection.mutable.ListBuffer
    
    object ProcessWindowFunctionDemo {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val stream = env.addSource(new ClickSource())
          .assignAscendingTimestamps(_.timeStamp)
    
        // 只需要url就可以统计数量,所以抽取url转换成String,直接开窗统计
        stream.map(_.url)
          // 开窗
          .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
            override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
              // 初始化一个map,key为url,value为url的pv数据
              val urlCountMap = Map[String, Long]()
              // 将url和pv数据写入Map中
              elements.foreach(r=>urlCountMap.get(r) match {
                case Some(count) => urlCountMap.put(r, count+1L)
                case None=> urlCountMap.put(r, 1L)
              })
              
              // 将Map中的键值对转换成列表数据结构
              // 列表中的元素是(K,V)元组
              var mapList = new ListBuffer[(String, Long)]()
              urlCountMap.keys.foreach(key=>
                urlCountMap.get(key) match {
                  case Some(count) => mapList+=((key, count))
                  case None => mapList
                }
              )
              // 按照浏览量进行降序排列
              mapList.sortBy(-_._2)
              // 输出
              val result = new StringBuilder
              result.append("=======\n")
              for(i <- 0 to 1){
                val temp = mapList(i)
                result.append("浏览量No."+(i+1)+" ")
                  .append("url:"+temp._1+" ")
                  .append("浏览量No."+(i+2)+" ")
                  .append("url:"+temp._2+" ")
                  .append("窗口结束时间是:"+ new Timestamp((context.window.getEnd))+"\n")
              }
            }
          })
          .print()
        
        env.execute()
      }
    }
    

    相关文章

      网友评论

          本文标题:第七章 处理函数

          本文链接:https://www.haomeiwen.com/subject/esksqdtx.html