美文网首页Flink
Flink实战双流join之interval Join

Flink实战双流join之interval Join

作者: bigdata张凯翔 | 来源:发表于2021-03-14 14:45 被阅读0次
    Apache Flink.png

    通俗易懂篇:

    前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?

    interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界。

    我们来看Flink官方的一张图。

    image

    我们看到,流A的每一个元素,都会和流B的一定时间范围的元素进行JOIN。
    其中,上界和下界可以是负数,也可以是整数。Interval join目前只支持INNER JOIN。将连接后的元素传递给ProcessJoinFunction时,时间戳变为两个元素中最大的那个时间戳。

    注意:
    Interval Join只支持事件时间。

    package com.istudy.work;
    
    import com.istudy.bean.FactOrderItem;
    import com.istudy.bean.Goods;
    import com.istudy.bean.OrderItem;
    import com.istudy.streamsource.GoodsSource;
    import com.istudy.streamsource.OrderItemSource;
    import com.istudy.watermark.GoodsWatermark;
    import com.istudy.watermark.OrderItemWatermark;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    import java.math.BigDecimal;
    
    /**
     * @projectname: HaiStream
     * @description:
     * @author: Mr.Zhang
     * @create: 2021-03-14 14:35
     **/
    public class IntervalJoin {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 构建商品数据流
            SingleOutputStreamOperator<Goods> goodsDS = env.addSource(new GoodsSource(), TypeInformation.of(Goods.class))
                    .assignTimestampsAndWatermarks(new GoodsWatermark() {
                    });
            // 构建订单明细数据流
            SingleOutputStreamOperator<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class))
                    .assignTimestampsAndWatermarks(new OrderItemWatermark());
    
            // 进行关联查询
            //todo 1、这里我们通过keyBy将两个流join到一起
            SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                    //todo 2、interval join需要设置流A去关联哪个时间范围的流B中的元素。
                    .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                    //todo 此处,我设置的下界为-1、上界为0,
                    .between(Time.seconds(-1), Time.seconds(0))
                    //todo  且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。
                    .upperBoundExclusive()
                    //todo process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中
                    .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                        @Override
                        public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                            FactOrderItem factOrderItem = new FactOrderItem();
                            factOrderItem.setGoodsId(right.getGoodsId());
                            factOrderItem.setGoodsName(right.getGoodsName());
                            factOrderItem.setCount(new BigDecimal(left.getCount()));
                            factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));
    
                            out.collect(factOrderItem);
                        }
                    });
    
            factOrderItemDS.print();
    
            env.execute("Interval JOIN");
        }
    }
    
    image.png

    运行结果:


    image.png

    深挖原理篇:

    join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联。interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。

    按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
    right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

    image.png
    interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。
    示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
    clickRecordStream
      .keyBy(record -> record.getMerchandiseId())
      .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
      .between(Time.seconds(-30), Time.seconds(30))
      .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
        @Override
        public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
          collector.collect(StringUtils.join(Arrays.asList(
            accessRecord.getMerchandiseId(),
            orderRecord.getPrice(),
            orderRecord.getCouponMoney(),
            orderRecord.getRebateAmount()
          ), '\t'));
        }
      })
      .print().setParallelism(1);
    

    由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

    interval join 的实现原理
    以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。

    public <OUT> SingleOutputStreamOperator<OUT> process(
            ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
            TypeInformation<OUT> outputType) {
        Preconditions.checkNotNull(processJoinFunction);
        Preconditions.checkNotNull(outputType);
        final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
        final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
            new IntervalJoinOperator<>(
                lowerBound,
                upperBound,
                lowerBoundInclusive,
                upperBoundInclusive,
                left.getType().createSerializer(left.getExecutionConfig()),
                right.getType().createSerializer(right.getExecutionConfig()),
                cleanedUdf
            );
        return left
            .connect(right)
            .keyBy(keySelector1, keySelector2)
            .transform("Interval Join", outputType, operator);
    }
    

    相关文章

      网友评论

        本文标题:Flink实战双流join之interval Join

        本文链接:https://www.haomeiwen.com/subject/dnuqcltx.html