美文网首页
使用FlinkCEP统计超时未支付的订单/用户触达

使用FlinkCEP统计超时未支付的订单/用户触达

作者: 和平菌 | 来源:发表于2020-03-14 20:51 被阅读0次

    需求:统计用户下单了但一段时间后没有支付的订单,用来触达用户。

    一、模拟一个数据源,用来模拟用户行为

    public class UserEvent implements Serializable {
    
        private String pin; //用户的id
        private String skuId; 商品的skuId
        private String action; //用户事件 0表示下单,1表示支付
    }
    
    public class RandomSource extends RichParallelSourceFunction<UserEvent> {
    
        private boolean isRun;
    
        private static List<UserEvent> events1 = new ArrayList<>();
        private transient int index = 0;
    
        @Override
        public void run(SourceContext<UserEvent> ctx) throws InterruptedException {
            while (isRun){
                if(index < events1.size()){
                    UserEvent event = events1.get(index % events1.size());
                    ctx.collect(event);
                    index++;
    //                System.out.println("send message:" + JSON.toJSONString(event));
                }
                Thread.sleep(1000);
            }
        }
    
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            isRun = true;
    
            addEvent(events1,"zhangsan", "1", "0");
            addEvent(events1,"lisi", "3", "0");
            addEvent(events1,"zhangsan", "1", "1");
    
        }
    
        private void addEvent(Collection<UserEvent> collections, String pin,String skuId, String action){
            UserEvent event = new UserEvent();
            event.setPin(pin);
            event.setSkuId(skuId);
            event.setAction(action);
            collections.add(event);
        }
    
    
        @Override
        public void cancel() {
            isRun = false;
        }
    }
    
    

    二、逻辑代码

    public class Rule6App {
    
        private static final OutputTag<UserEvent> timeOutTag = new OutputTag<>("timeOut", TypeInformation.of(UserEvent.class));
    
        public static void main(String[] args) throws Exception{
        
            //步骤1、定义数据源,给数据添加水印
            final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor<UserEvent>();
    
            StreamExecutionEnvironment env = StreamCommon.getStreamEnv(true, true);
            DataStream<UserEvent> dataStream = env.addSource(new RandomSource())
                    .setParallelism(1) //并行度设置成1,便于观察
                    .assignTimestampsAndWatermarks(extractor); //加水印
            //步骤2、定义Pattern,可以看成是一个事件链
            //我这里定义的意思是,先收到下单的消息,然后再收到支付的消息,事件链可以很长,也可以有
           //复杂的组合事件
            Pattern<UserEvent,UserEvent> pattern = Pattern.<UserEvent>begin("order")
                    .where(new SimpleCondition<UserEvent>() {
                        @Override
                        public boolean filter(UserEvent event) throws Exception {
                            return "0".equals(event.getAction());
                        }
                    })
                    .next("pay")
                    .where(new SimpleCondition<UserEvent>() {
                        @Override
                        public boolean filter(UserEvent event) throws Exception {
                            return "1".equals(event.getAction());
                        }
                    });
            
           //步骤3、进行事件匹配,在匹配前,需要对流进行KeyBy分组,确保每个单元里处理的是
           //同一个用户的订单
            PatternStream<UserEvent> patternStream = CEP.pattern(
                    dataStream.keyBy(UserEvent::getPin),
                    pattern.within(Time.seconds(10)));
        
          //步骤4、对匹配的结果进行分流
          //这里注意到select有3个参数,第一个是超时消息的容器,这里通过旁路进行输出
          //第二个参数里定义了超时的消息如何进行处理
          //第三个参数里定义里正常匹配到规则的消息如何进行处理
            SingleOutputStreamOperator<UserEvent> result = patternStream.select(timeOutTag, new PatternTimeoutFunction<UserEvent, UserEvent>() {
                @Override
                public UserEvent timeout(Map<String, List<UserEvent>> map, long l) throws Exception {
                    System.out.println("这是超时了的:" + JSON.toJSONString(map));
                    return map.get("order").get(0);
                }
            }, new PatternSelectFunction<UserEvent, UserEvent>() {
                @Override
                public UserEvent select(Map<String, List<UserEvent>> map) throws Exception {
                    System.out.println("这是完成的订单:" + JSON.toJSONString(map));
                    return map.get("pay").get(0);//这里Map有2个KEY,就是前面定义事件链的tag
                }
            });
          
            //步骤5、从旁路拿到结果流,完成超时触达
            DataStream<UserEvent> timeoutResult = result.getSideOutput(timeOutTag);
            timeoutResult.print();
            env.execute();
        }
    
    }
    
    

    三、需要注意的点
    1、必须要是EventTime
    尝试了ProcessTime取不到超时的结果,只能拿到匹配到规则的结果
    2、within的含义
    within在每一条消息到达时,为该消息开启一个定时器,当整个时间链都匹配到结果,则终止定时器,否则被视为超时。

    例如第一个条件要累计多次的时候,当满足累积多次后,会重新开始 计时来计算超时。
    3、.oneOrMore().where(...)
    满足条件后开始计时,再within限定的时间段内如果有满足条件的数据进来则修改计时器重新计时(类似SessionWindow)

    相关文章

      网友评论

          本文标题:使用FlinkCEP统计超时未支付的订单/用户触达

          本文链接:https://www.haomeiwen.com/subject/vsbnshtx.html