美文网首页
使用Flink对数据进行Join连接

使用Flink对数据进行Join连接

作者: 和平菌 | 来源:发表于2019-04-19 17:41 被阅读0次

Join数据是得有2个数据源情况下,用一定的条件连接2条数据,生成一个新的数据源
类似数据库中的Join查询

例如:我们有支付数据和下单的数据,我们需要通过orederId来关联到一起。

首先要对数据源进行处理,主要是进行分组(keyBy),保证orderId相同的数据是进到了同一个窗口内
数据源1是订单数据:

DataStream<Order> keyByedOrderSource = orderSource.keyBy(new OrderSerialKey());

static class OrderSerialKey implements KeySelector<Order, String> {
        private static final long serialVersionUID = 1L;
        @Override
        public String getKey(Order e) {
            if(e == null || e.getOrderId() == null){
                return "0";
            }
            String key = MD5Util.string2MD5(e.getOrderId().trim().toUpperCase());
            return key;
        }
    }

同样的要对支付消息进行keyBy操作

第二步就是直接在窗口内进行连接了

DataStream<Order> joinStream = keyByedOrderSource.join(keyByedPayinfoSource )
                .where(new OrderSerialKey())
                .equalTo(new PayinfoSerialKey())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(JoinProcessTimeTrigger.of(Time.minutes(1)))
                .apply(new JoinFunction<Order, Payinfo, PayedOrder>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public String join(Order order, Payinfo payinfo) {
                        //构建一个PayedOrder
                        PayedOrder po = new PayedOrder();
                        po.setXXXX.....
                        return po;
                    }
                });

注意的是,当存在多个orderId相同的订单或者多个orderId相同的支付消息时,那么join出的结果就是cube计算。
例如订单A和订单B的orderId都是1,而支付消息X和支付消息Y的orderId都是1时,
那么就会join出来4条消息
A X
A Y
B X
B Y

这个时候如果不想要这样的结果,那么可以重写JoinFunction进行去重。

以上。

相关文章

网友评论

      本文标题:使用Flink对数据进行Join连接

      本文链接:https://www.haomeiwen.com/subject/gjcowqtx.html