美文网首页
第七章 处理函数

第七章 处理函数

作者: 井底蛙蛙呱呱呱 | 来源:发表于2022-12-17 19:54 被阅读0次

Flink本身提供了多层API,前面介绍的DataStream API只是其中的一环。


在前面的章节介绍了诸多Flink提供的算子(如map、filter、widow等)。除了使用这些已有的算子,我们也可以自定义算子,也即本章的处理函数(process function)。

基本处理函数(ProcessFunction)

之前讲的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限,如map算子,只能获取到当前的数据。而窗口聚合算子AggregateFuntion以及富函数(如RichMapFunction),可以拿到上下文状态等信息。但是无论那种算子,想要访问时间戳,或者当前水位线信息,都是完全做不到的,这时就需要处理函数(ProcessFunction)。

处理函数提供了一个“定时服务”(TimeService),我们可以通过它访问流中的时间(event),时间戳(timestamp),水位线(Watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数汉可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个DataStream API的最底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。stream.process(new MyProcessFunction)

一个梨子:

package com.whu.chapter07

import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.functions._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionDemo {
  def main(args:Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      .process(new ProcessFunction[Event, String] {
        // 每来一条元素都会调用一次
        override def processElement(i:Event, context:ProcessFunction[Event,String]#Context, collector: Collector[String]):Unit = {
          if(i.user.equals("Mary")){
            // 向下游发送数据
            collector.collect(i.user)
          }else if(i.user.equals("bob")){
            collector.collect(i.user)
          }
          // 打印当前水位线
          println(context.timerService.currentWatermark())
        }
      })
      .print()

    env.execute()
  }
}

ProcessFunction解析:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
  ...
  public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
  ...
}

processElement方法用于处理原始,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及收集器(Collector)out。方法没有返回值,处理之后的输出数据通过收集器的out定义。

  • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致;
  • ctx: 类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法output();
  • out: “收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap()算子中的收集器完全一样,直接调用out.collect()方法就可以像下游发出一个数据。这个方法可多次调用,也可以不调用。

onTimer用于定时触发的操作,这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。

处理函数的分类

Flink提供了8个不同的处理函数:

  • ProcessFunction,最基本的处理函数,基于 DataStream 直接调用 process()时作为参数传入;
  • KeyedProcessFunction,对流按键分区后的处理函数,基于 KeyedStream 调用 process()时作为参数传入。要想使用定时器,必须基于 KeyedStream;
  • ProcessWindowFunction,开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process()时作为参数传入;
  • ProcessAllWindowFunction,同样是开窗之后的处理函数,基于 AllWindowedStream 调用 process()时作为参数传入;
  • CoProcessFunction,合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process()时作为参数传入;
  • ProcessJoinFunction,间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process()时作为参数传入;
  • BroadcastProcessFunction,广播连接流处理函数,基于 BroadcastConnectedStream 调用 process()时作为参数传入;
  • KeyedBroadcastProcessFunction,按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用 process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream
    与广播流(BroadcastStream)做连接之后的产物。

按键分区处理函数(KeyedProcessFunction)

在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。

package com.whu.chapter07

import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.sql.Timestamp

object KeyedProcessFunctionDemo {
  def main(args: Array[String]):Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      .keyBy(r=>true)
      .process(new KeyedProcessFunction[Boolean, Event, String]{
        override def processElement(i: Event, context: KeyedProcessFunction[Boolean, Event, String]#Context, collector: Collector[String]): Unit = {
          val currTs = context.timerService.currentProcessingTime()
          collector.collect("数据到达时间:"+new Timestamp(currTs))
          // 注册10秒钟之后的处理时间定时器
          context.timerService().registerProcessingTimeTimer(currTs+10*1000L)
        }

        // 定时器逻辑
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
          out.collect("定时器触发时间:"+new Timestamp(timestamp))
        }
      }).print()

    env.execute()
  }
}

窗口处理函数

窗 口 处 理 函 数 ProcessWindowFunction 的 使 用 与 其 他 窗 口 函 数 类 似 , 也 是 基 于WindowedStream 直接调用方法就可以,只不过这时调用的是 process()。

ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
...
  public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
  public void clear(Context context) throws Exception {}
  public abstract class Context implements java.io.Serializable {...}
}

其有四个类型的参数:

  • IN: input, 数据流中窗口任务的输入数据类型;
  • OUT: output, 窗口任务进行计算之后的输出数据类型;
  • KEY: 数据中key的类型;
  • W:窗口的类型,是window的子类型,一般情况下我们定义时间窗口,w就是TimeWindow;

由于全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是 processElement(),
而是改成了 process()。方法包含四个参数:

  • key:窗口做统计计算基于的键,也就是之前keyBy()用来区分的字段;
  • context:当前窗口进行计算的上下文,他的类型就是ProcessWindowFunction内部定义的抽象类context;
  • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型;
  • out:用来发送数据输出计算结果的收集器,类型为Collector;

一个计算网页浏览量topN的例子:

package com.whu.chapter07


import org.apache.flink.streaming.api.functions._
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._
import java.sql.Timestamp
import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable.ListBuffer

object ProcessWindowFunctionDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)

    // 只需要url就可以统计数量,所以抽取url转换成String,直接开窗统计
    stream.map(_.url)
      // 开窗
      .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          // 初始化一个map,key为url,value为url的pv数据
          val urlCountMap = Map[String, Long]()
          // 将url和pv数据写入Map中
          elements.foreach(r=>urlCountMap.get(r) match {
            case Some(count) => urlCountMap.put(r, count+1L)
            case None=> urlCountMap.put(r, 1L)
          })
          
          // 将Map中的键值对转换成列表数据结构
          // 列表中的元素是(K,V)元组
          var mapList = new ListBuffer[(String, Long)]()
          urlCountMap.keys.foreach(key=>
            urlCountMap.get(key) match {
              case Some(count) => mapList+=((key, count))
              case None => mapList
            }
          )
          // 按照浏览量进行降序排列
          mapList.sortBy(-_._2)
          // 输出
          val result = new StringBuilder
          result.append("=======\n")
          for(i <- 0 to 1){
            val temp = mapList(i)
            result.append("浏览量No."+(i+1)+" ")
              .append("url:"+temp._1+" ")
              .append("浏览量No."+(i+2)+" ")
              .append("url:"+temp._2+" ")
              .append("窗口结束时间是:"+ new Timestamp((context.window.getEnd))+"\n")
          }
        }
      })
      .print()
    
    env.execute()
  }
}

相关文章

网友评论

      本文标题:第七章 处理函数

      本文链接:https://www.haomeiwen.com/subject/esksqdtx.html