美文网首页
Flink Joining

Flink Joining

作者: Rex_2013 | 来源:发表于2020-09-08 14:04 被阅读0次

    前言

    数据流操作的一个常见需求是对两条数据流中的事件进行联结(connect)或Join。connect在前面Flink API
    文章中。
    Flink DataStream API中内置有两个可以根据时间条件对数据流进行Join的算子:Window JoinInterval Join

    如果Flink内置的Join算子无法表达所需的Join语义,那么你可以通过CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction实现自定义的Join逻辑。

    注意,你要设计的Join算子需要具备高效的状态访问模式及有效的状态清理策略。

    Window Join

    顾名思义,Window Join需要用到Flink中的窗口机制。其原理是将两条输入流中的元素分配到公共窗口中并在窗口完成时进行Join(或Cogroup)。

    下面的例子展示了如何定义基于窗口的Join。

    input1.join(input2)
      .where(<KeySelector>)       // 为input1指定键值属性
      .equalTo(<KeySelector>)     // 为input2指定键值属性
      .window(<WindowAssigner>)      // 指定WindowAssigner
      [.trigger(...)]   // 选择性的指定Trigger
      [.evictor(...)]   // 选择性的指定Evictor
      .apply(<JoinFunction>)       // 指定JoinFunction
    

    有关语义的一些注意事项:

    • 使用两个流的元素的组成Window Join像一个inner-join,这意味着如果一个stream中的元素没有与另一流中要连接的Stream对应的元素,则不会发出该元素。
    • 那些joined的元素将以最大的timestamp(仍位于相应窗口中)作为timestamp。例如,以窗口[5, 10)为边界的窗口将导致连接的元素具有9作为其时间戳
    Window Join

    两条输入流都会根据各自的键值属性进行分区,公共窗口分配器会将二者的事件映射到公共窗口内(其中同时存储了两条流中的数据)。当窗口的计时器触发时,算子会遍历两个输入中元素的每个组合(叉乘积)去调用JoinFunction。同时你也可以自定义触发器或移除器。由于两条流中的事件会被映射到同一个窗口中,因此该过程中的触发器和移除器与常规窗口算子中的完全相同。

    除了对窗口中的两条流进行inner join,如果要实现两流的实现left/right join。你可以对它们进行Cogroup,只需将算子定义开始位置的join改为coGroup()即可。Join和Cogroup的总体逻辑相同,区别是:Join会为两侧输入中的每个事件对调用JoinFunction;而Cogroup中用到的CoGroupFunction会以两个输入的元素遍历器为参数,只在每个窗口中被调用一次。

    注意,对划分窗口后的数据流进行Join可能会产生意想不到的语义。例如,假设你为执行Join操作的算子配置了1小时的滚动窗口,那么一旦来自两个输入的元素没有被划分到同一窗口,它们就无法Join在一起,即使二者彼此仅相差1秒钟。

    Tumbling Window Join

    Tumbling Window Join

    如图所示,我们定义了一个大小为2毫秒的滚动窗口,该窗口的形式为[0,1], [2,3], ...。该图显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻转窗口中[6,7]什么也不发射,因为在绿色流中不存在要与橙色元素⑥和joined连接的元素。

    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream.join(greenStream)
        .where(elem => /* select key */)
        .equalTo(elem => /* select key */)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
        .apply { (e1, e2) => e1 + "," + e2 }
    

    Sliding Window Join

    Sliding Window Join

    在此示例中,我们使用大小为2毫秒的滑动窗口,并将它们滑动1毫秒,从而形成滑动窗口[-1, 0],[0,1],[1,2],[2,3], …。x轴下方的连接元素是传递给JoinFunction每个滑动窗口的元素。在这里,您还可以看到例如橙色②与绿色③在窗口中如何结合[2,3],而没有与窗口中的任何事物结合[1,2]。

    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream.join(greenStream)
        .where(elem => /* select key */)
        .equalTo(elem => /* select key */)
        .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
        .apply { (e1, e2) => e1 + "," + e2 }
    

    Session Window Join

    Session Window Join

    在这里,我们定义了一个会话窗口连接,其中每个会话之间的间隔至少为1ms。共有三个会话,在前两个会话中,两个流中的联接元素都传递给JoinFunction。在第三个会话中,绿色流中没有元素,因此⑧和⑨不连接!

    import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
     
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream.join(greenStream)
        .where(elem => /* select key */)
        .equalTo(elem => /* select key */)
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
        .apply { (e1, e2) => e1 + "," + e2 }
    

    Interval Join

    window Join 是基于窗口来做join。很多实际业务场景,一些动作是有先后顺序的。比如浏览和点击的行为,不可能是同时进行的。
    Interval Join的Join会对两条流中拥有相同键值以及彼此之间时间戳不超过某一指定间隔的事件进行Join。

    Interval Join

    在上面的示例中,我们将两个流“橙色”和“绿色”连接在一起,下限为-2毫秒,上限为+1毫秒。缺省情况下,这些界限是包容性的,但.lowerBoundExclusive()并.upperBoundExclusive可以应用到改变行为。

    使用更正式的符号,这将转化为
    orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

    如三角形所示。

    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream
        .keyBy(elem => /* select key */)
        .intervalJoin(greenStream.keyBy(elem => /* select key */))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process(new ProcessJoinFunction[Integer, Integer, String] {
            override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
             out.collect(left + "," + right); 
            }
          });
        });
    

    参考官网 interval-join

    相关文章

      网友评论

          本文标题:Flink Joining

          本文链接:https://www.haomeiwen.com/subject/npddektx.html