Flink 计算 TopN

作者: shengjk1 | 来源:发表于2020-12-28 14:44 被阅读0次

    前言

    使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。
    基于 Flink 1.12

    场景

    外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。

    kafka 中消息类型

    {"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}
    

    locTime:事件发生的时间,courierId 外卖员id

    计算一天中 听单次数 top5 的外卖员

    代码

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);
            FlinkHelp.setOffset(parameter, consumer);
            consumer.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<String>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                                @Override
                                public long extractTimestamp(String element, long recordTimestamp) {
                                    String locTime = "";
                                    try {
                                        Map<String, Object> map = Json2Others.json2map(element);
                                        locTime = map.get("locTime").toString();
                                    } catch (IOException e) {
                                    }
                                    LocalDateTime startDateTime =
                                            LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                                    long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
                                    return milli;
                                }
                            }).withIdleness(Duration.ofSeconds(1)));
    
            SingleOutputStreamOperator<CourierListenInfos> process = env.addSource(consumer).filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return true;
                }
            }).keyBy(new KeySelector<String, String>() {
                @Override
                public String getKey(String value) throws Exception {
                    Map<String, Object> map = Json2Others.json2map(value);
                    String courierId = map.get("courierId").toString();
                    String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
                    return day + "-" + courierId;
                }
            }).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
                    .allowedLateness(Time.minutes(1))
    //              .trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger
                    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
                    //追历史数据的时候会有问题
    //              .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                    //处理完毕后将 window state 中的数据清除掉
                    .evictor(TimeEvictor.of(Time.seconds(0), true))
                    .process(new ProcessWindowFunction<String, CourierListenInfos, String, TimeWindow>() {
                        private JedisCluster jedisCluster;
                        private ReducingStateDescriptor<Long> reducingStateDescriptor;
                        private ReducingState<Long> listenCount;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            StateTtlConfig ttlConfig = StateTtlConfig
                                    .newBuilder(org.apache.flink.api.common.time.Time.hours(25))
                                    //default,不支持 eventTime 1.12.0
                                    .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                                    .cleanupInRocksdbCompactFilter(1000)
                                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
                                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                    .build();
    
                            reducingStateDescriptor =
                                    new ReducingStateDescriptor<Long>("listenCount", new Sum(), TypeInformation.of(Long.class));
                            reducingStateDescriptor.enableTimeToLive(ttlConfig);
                            listenCount = getRuntimeContext().getReducingState(reducingStateDescriptor);
    
                            jedisCluster = RedisUtil.getJedisCluster(redisHp);
                        }
    
                        @Override
                        public void close() throws Exception {
                            RedisUtil.closeConn(jedisCluster);
                        }
    
                        @Override
                        public void process(String s, Context context, Iterable<String> elements, Collector<CourierListenInfos> out) throws Exception {
                            Iterator<String> iterator = elements.iterator();
    
                            long l = context.currentProcessingTime();
                            long watermark = context.currentWatermark();
                            TimeWindow window = context.window();
    
                            String endDay = DateUtils.millisecondsToDateStr(window.getEnd(), "yyyyMMdd HH:mm:ss");
                            String startDay = DateUtils.millisecondsToDateStr(window.getStart(), "yyyyMMdd HH:mm:ss");
    
                            System.out.println("currentProcessingTime:" + l + " watermark:" + watermark + " windowTime:" + startDay + "-" + endDay);
    
                            while (iterator.hasNext()) {
                                iterator.next();
                                listenCount.add(1L);
                            }
    
                            iterator = elements.iterator();
                            Map<String, Object> map = Json2Others.json2map(iterator.next());
                            String courierId = map.get("courierId").toString();
                            String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
                            out.collect(new CourierListenInfos(day, courierId, listenCount.get()));
                        }
                    });
    
            process.keyBy(new KeySelector<CourierListenInfos, String>() {
                @Override
                public String getKey(CourierListenInfos value) throws Exception {
                    return value.getDay();
                }
            }).process(new KeyedProcessFunction<String, CourierListenInfos, String>() {
                private JedisCluster jedisCluster;
                private MapStateDescriptor<String, Long> mapStateCountDescriptor;
                private MapState<String, Long> courierInfoCountMapState;
                private boolean mucalc = false;
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    StateTtlConfig ttlConfig = StateTtlConfig
                            .newBuilder(org.apache.flink.api.common.time.Time.hours(25))
                            //default,不支持 eventTime 1.12.0
                            .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                            .cleanupInRocksdbCompactFilter(1000)
                            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
                            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                            .build();
    
                    mapStateCountDescriptor =
                            new MapStateDescriptor<String, Long>("courierInfoCountMapState", TypeInformation.of(String.class), TypeInformation.of(Long.class));
                    mapStateCountDescriptor.enableTimeToLive(ttlConfig);
                    courierInfoCountMapState = getRuntimeContext().getMapState(mapStateCountDescriptor);
    
                    jedisCluster = RedisUtil.getJedisCluster(redisHp);
                }
    
                @Override
                public void close() throws Exception {
                    RedisUtil.closeConn(jedisCluster);
                }
    
                @Override
                public void processElement(CourierListenInfos value, Context ctx, Collector<String> out) throws Exception {
                    courierInfoCountMapState.put(value.getDay() + "#" + value.getCourierId(), value.getListenCount());
    //              System.out.println("ctx.timerService().currentWatermark() = " + DateUtils.millisecondsToDateStr(ctx.timerService().currentWatermark(), "yyyyMMdd HH:mm:ss"));
    //              System.out.println("ctx.timestamp() = " + DateUtils.millisecondsToDateStr(ctx.timestamp(), "yyyyMMdd HH:mm:ss"));
                    ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() / 1000 + 1000);
                }
    
                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                    String day = ctx.getCurrentKey();
                    PriorityQueue<CourierListenInfos> courierListenInfos = new PriorityQueue<>(new Comparator<CourierListenInfos>() {
                        @Override
                        public int compare(CourierListenInfos o1, CourierListenInfos o2) {
                            return (int) (o1.listenCount - o2.listenCount);
                        }
                    });
    
                    Iterable<Map.Entry<String, Long>> entries = courierInfoCountMapState.entries();
                    for (Map.Entry<String, Long> entry : entries) {
    //                  System.out.println("entry.getKey() " + entry.getKey());
                        String[] split = entry.getKey().split("#", -1);
                        courierListenInfos.offer(new CourierListenInfos(split[0], split[1], entry.getValue()));
                        if (courierListenInfos.size() > 5) {
                            courierListenInfos.poll();
                        }
                    }
    
                    courierInfoCountMapState.clear();
                    String tops = "";
                    int size = courierListenInfos.size();
                    for (int i = 0; i < size; i++) {
                        CourierListenInfos courierListenInfos1 = courierListenInfos.poll();
                        System.out.println("courierListenInfos1 " + courierListenInfos1);
                        courierInfoCountMapState.put(courierListenInfos1.getDay() + "#" + courierListenInfos1.getCourierId(), courierListenInfos1.listenCount);
                        tops = tops + courierListenInfos1.courierId + "#" + courierListenInfos1.listenCount;
                        if (i != size - 1) {
                            tops += ",";
                        }
                    }
    //              System.out.println("courierListenInfos.poll() = " + tops);
                    jedisCluster.hset("test_courier_tops", day + "-top5", tops);
                    System.out.println("============");
                }
            }).setParallelism(1);
    

    结果样例

    '20201227-top5':'1#1111,2#2222,3#3333'
    '20201227-top5':'1#1111,2#2222,3#3333'

    相关文章

      网友评论

        本文标题:Flink 计算 TopN

        本文链接:https://www.haomeiwen.com/subject/lazznktx.html