美文网首页
Flink项目系列2-实时热门商品统计

Flink项目系列2-实时热门商品统计

作者: 只是甲 | 来源:发表于2021-11-11 11:14 被阅读0次

    一. 项目剖析

    基本需求:
    统计近1小时内的热门商品,每5分钟更新一次
    热门度用浏览次数(“pv”)来衡量

    解决思路
    在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计
    构建滑动窗口,窗口长度为1小时,滑动距离为5分钟

    image.png

    按照商品Id进行分区

    image.png

    设置时间窗口

    image.png

    时间窗口(timeWindow)区间为左闭右开
    同一份数据会被分发到不同的窗口


    image.png

    窗口聚合

    image.png

    窗口聚合策略——每出现一条记录就加一


    image.png

    实现 AggregateFunction 接口
    \color{red}{interface AggregateFunction<IN, ACC, OUT>}

    定义输出结构 —— ItemViewCount(itemId, windowEnd, count)

    实现 WindowFunction 接口
    \color{red}{interface WindowFunction<IN, OUT, KEY, W extends Window>}
      • IN: 输入为累加器的类型,Long
      • OUT: 窗口累加以后输出的类型为 ItemViewCount(itemId: Long, windowEnd: Long, count: Long), windowEnd为窗口的 结束时间,也是窗口的唯一标识
      • KEY: Tuple泛型,在这里是 itemId,窗口根据itemId聚合
      • W: 聚合的窗口,w.getEnd 就能拿到窗口的结束时间

    image.png

    窗口聚合示例

    image.png

    进行统计整理 —— keyBy(“windowEnd”)

    image.png

    状态编程

    image.png

    最终排序输出——keyedProcessFunction

    1. 针对有状态流的底层API
    2. KeyedProcessFunction会对分区后的每一条子流进行处理
    3. 以windowEnd作为key,保证分流以后每一条流的数据都在一个时间窗口内
    4. 从ListState中读取当前流的状态,存储数据进行排序输出

    用ProcessFunction定义KeyedStream的处理逻辑
    分区之后,每个KeyedStream都有其自己的生命周期

    1. open:初始化,在这里可以获取当前流的状态
    2. processElement:处理流中每一个元素时调用
    3. onTimer:定时调用,注册定时器Timer并触发之后的回调操作
    image.png

    二.pom文件配置

    依赖配置:

    <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.1</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.1</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-core</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-redis_2.11</artifactId>
          <version>1.1.5</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>8.0.19</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <!-- Table API 和 Flink SQL -->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-java-bridge_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-common</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-csv</artifactId>
          <version>1.10.1</version>
        </dependency>
    

    三.代码

    3.1 POJO类

    UserBehavior

    package com.zqs.flink.project.hotitemanalysis.beans;
    
    /**
     * @author  只是甲
     * @date    2021-10-14
     * @remark  定义一个输入类型的class
     */
    
    public class UserBehavior {
        // 定义私有属性
        private Long userId;
        private Long itemId;
        private Integer categoryId;
        private String behavior;
        private Long timestamp;
    
        public UserBehavior() {
        }
    
        public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.categoryId = categoryId;
            this.behavior = behavior;
            this.timestamp = timestamp;
        }
    
        public Long getUserId() {
            return userId;
        }
    
        public void setUserId(Long userId) {
            this.userId = userId;
        }
    
        public Long getItemId() {
            return itemId;
        }
    
        public void setItemId(Long itemId) {
            this.itemId = itemId;
        }
    
        public Integer getCategoryId() {
            return categoryId;
        }
    
        public void setCategoryId(Integer categoryId) {
            this.categoryId = categoryId;
        }
    
        public String getBehavior() {
            return behavior;
        }
    
        public void setBehavior(String behavior) {
            this.behavior = behavior;
        }
    
        public Long getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(Long timestamp) {
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId=" + userId +
                    ", itemId=" + itemId +
                    ", categoryId=" + categoryId +
                    ", behavior='" + behavior + '\'' +
                    ", timestamp=" + timestamp +
                    '}';
        }
    }
    

    ItemViewCount

    package com.zqs.flink.project.hotitemanalysis.beans;
    
    /**
     * @author  只是甲
     * @date    2021-10-14
     * @remark  定义一个输出类型的class
     */
    
    public class ItemViewCount {
        private Long itemId;
        private Long windowEnd;
        private Long count;
    
        public ItemViewCount() {
        }
    
        public ItemViewCount(Long itemId, Long windowEnd, Long count) {
            this.itemId = itemId;
            this.windowEnd = windowEnd;
            this.count = count;
        }
    
        public Long getItemId() {
            return itemId;
        }
    
        public void setItemId(Long itemId) {
            this.itemId = itemId;
        }
    
        public Long getWindowEnd() {
            return windowEnd;
        }
    
        public void setWindowEnd(Long windowEnd) {
            this.windowEnd = windowEnd;
        }
    
        public Long getCount() {
            return count;
        }
    
        public void setCount(Long count) {
            this.count = count;
        }
    
        @Override
        public String toString() {
            return "ItemViewCount{" +
                    "itemId=" + itemId +
                    ", windowEnd=" + windowEnd +
                    ", count=" + count +
                    '}';
        }
    }
    

    3.2 热门商品-纯Java代码

    HotItems

    package com.zqs.flink.project.hotitemanalysis;
    
    import com.zqs.flink.project.hotitemanalysis.beans.ItemViewCount;
    import com.zqs.flink.project.hotitemanalysis.beans.UserBehavior;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.util.Collector;
    
    import java.sql.Timestamp;
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Properties;
    
    
    public class HotItems {
        public static void main(String[] args) throws Exception {
            // 1. 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 2. 读取数据,创建DataStream
            DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkProject\\src\\main\\resources\\UserBehavior.csv");
    
            //Properties properties = new Properties();
            //properties.setProperty("bootstrap.servers", "localhost:9092");
            //properties.setProperty("group.id", "consumer");
            //properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //properties.setProperty("auto.offset.reset", "latest");
    
            //DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("hotitems", new SimpleStringSchema(), properties));
    
    
    
    
            // 3. 转换为POJO,分配时间戳和watermark
            DataStream<UserBehavior> dataStream = inputStream
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                    })
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                        @Override
                        public long extractAscendingTimestamp(UserBehavior element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 4. 分组开窗聚合,得到每个窗口内各个商品的count值
            DataStream<ItemViewCount> windowAggStream = dataStream
                    .filter(data -> "pv".equals(data.getBehavior()))    // 过滤pv行为
                    .keyBy("itemId")    // 按商品ID分组
                    .timeWindow(Time.hours(1), Time.minutes(5))    // 开滑窗
                    .aggregate(new ItemCountAgg(), new WindowItemCountResult());
    
            // 5. 收集同一窗口的所有商品count数据,排序输出top n
            DataStream<String> resultStream = windowAggStream
                    .keyBy("windowEnd")    // 按照窗口分组
                    .process(new TopNHotItems(5));   // 用自定义处理函数排序取前5
    
            resultStream.print();
    
            env.execute("hot items analysis");
        }
    
        // 实现自定义增量聚合函数
        public static class ItemCountAgg implements AggregateFunction<UserBehavior, Long, Long> {
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(UserBehavior value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return a + b;
            }
        }
    
        // 自定义全窗口函数
        public static class WindowItemCountResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
                Long itemId = tuple.getField(0);
                Long windowEnd = window.getEnd();
                Long count = input.iterator().next();
                out.collect(new ItemViewCount(itemId, windowEnd, count));
            }
        }
    
        // 实现自定义KeyedProcessFunction
        public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String>{
            // 定义属性,top n的大小
            private Integer topSize;
    
            public TopNHotItems(Integer topSize) {
                this.topSize = topSize;
            }
    
            // 定义列表状态,保存当前窗口内所有输出的ItemViewCount
            ListState<ItemViewCount> itemViewCountListState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("item-view-count-list", ItemViewCount.class));
            }
    
            @Override
            public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
                // 每来一条数据,存入List中,并注册定时器
                itemViewCountListState.add(value);
                ctx.timerService().registerEventTimeTimer( value.getWindowEnd() + 1 );
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                // 定时器触发,当前已收集到所有数据,排序输出
                ArrayList<ItemViewCount> itemViewCounts = Lists.newArrayList(itemViewCountListState.get().iterator());
    
                itemViewCounts.sort(new Comparator<ItemViewCount>() {
                    @Override
                    public int compare(ItemViewCount o1, ItemViewCount o2) {
                        return o2.getCount().intValue() - o1.getCount().intValue();
                    }
                });
    
                // 将排名信息格式化成String,方便打印输出
                StringBuilder resultBuilder = new StringBuilder();
                resultBuilder.append("===================================\n");
                resultBuilder.append("窗口结束时间:").append( new Timestamp(timestamp - 1)).append("\n");
    
                // 遍历列表,取top n输出
                for( int i = 0; i < Math.min(topSize, itemViewCounts.size()); i++ ){
                    ItemViewCount currentItemViewCount = itemViewCounts.get(i);
                    resultBuilder.append("NO ").append(i+1).append(":")
                            .append(" 商品ID = ").append(currentItemViewCount.getItemId())
                            .append(" 热门度 = ").append(currentItemViewCount.getCount())
                            .append("\n");
                }
                resultBuilder.append("===============================\n\n");
    
                // 控制输出频率
                Thread.sleep(1000L);
    
                out.collect(resultBuilder.toString());
            }
        }
    }
    

    运行记录:

    image.png

    2.3 热门商品-Table API和Flink SQL实现

    HotItemsWithSql

    package com.zqs.flink.project.hotitemanalysis;
    
    import com.zqs.flink.project.hotitemanalysis.beans.UserBehavior;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class HotItemsWithSql {
        public static void main(String[] args) throws Exception {
            // 1. 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 2. 读取数据,创建DataStream
            DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkProject\\src\\main\\resources\\UserBehavior.csv");
    
            // 3. 转换为POJO, 分配时间戳和watermark
            DataStream<UserBehavior> dataStream = inputStream
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                    })
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                        @Override
                        public long extractAscendingTimestamp(UserBehavior element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 4. 创建表执行环境,用blink版本
            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    
            // 5. 将流转换为表
            Table dataTable = tableEnv.fromDataStream(dataStream, "itemId, behavior, timestamp.rowtime as ts");
    
            // 6. 分组开窗
            // table api
            Table windowAggTable = dataTable
                    .filter("behavior = 'pv'")
                    .window(Slide.over("1.hours").every("5.minutes").on("ts").as("w"))
                    .groupBy("itemId, w")
                    .select("itemId, w.end as windowEnd, itemId.count as cnt");
    
            // 7. 利用开窗函数,对count值进行排序并获取Row number, 得到Top N
            // SQL
            DataStream<Row> aggStream = tableEnv.toAppendStream(windowAggTable, Row.class);
            tableEnv.createTemporaryView("agg", aggStream, "itemId, windowEnd, cnt");
    
            Table resultTable = tableEnv.sqlQuery("select * from " +
                    "  ( select *, ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num " +
                    "  from agg) " +
                    " where row_num <= 5 ");
    
            // 纯SQL实现
            tableEnv.createTemporaryView("data_table", dataStream, "itemId, behavior, timestamp.rowtime as ts");
            Table resultSqlTable = tableEnv.sqlQuery("select * from " +
                    "  ( select *, ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num " +
                    "  from ( " +
                    "    select itemId, count(itemId) as cnt, HOP_END(ts, interval '5' minute, interval '1' hour) as windowEnd " +
                    "    from data_table " +
                    "    where behavior = 'pv' " +
                    "    group by itemId, HOP(ts, interval '5' minute, interval '1' hour)" +
                    "    )" +
                    "  ) " +
                    " where row_num <= 5 ");
    
            tableEnv.toRetractStream(resultSqlTable, Row.class).print();
    
            env.execute("hot items with sql job");
    
    
        }
    }
    

    测试记录:

    image.png

    2.4 将文件写入kafka

    真实环境一般是flink直连kafka,然后将处理的数据输出

    KafkaProducerUtil

    package com.zqs.flink.project.hotitemanalysis;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.util.Properties;
    
    /**
     * @author 只是甲
     * @date   2021-10-15
     * @remark 读取文件数据写入到kafka
     */
    
    public class KafkaProducerUtil {
        public static void main(String[] args) throws Exception {
            writeToKafka("hotitems");
        }
    
        // 包装一个写入kafka的方法
        public static void writeToKafka(String topic) throws Exception{
            // kafka配置
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");
            properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 定义一个Kafka Producer
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            // 用缓冲方式读取文本
            BufferedReader bufferedReader = new BufferedReader(new FileReader("C:\\Users\\Administrator\\IdeaProjects\\FlinkProject\\src\\main\\resources\\UserBehavior.csv"));
            String line;
            while ((line = bufferedReader.readLine()) != null ){
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, line);
                //用producer发送数据
                kafkaProducer.send(producerRecord);
            }
            kafkaProducer.close();
    
    
        }
    }
    

    测试记录:

    image.png

    参考:

    1. https://www.bilibili.com/video/BV1qy4y1q728
    2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_1431-%e7%83%ad%e9%97%a8%e5%ae%9e%e6%97%b6%e5%95%86%e5%93%81%e7%bb%9f%e8%ae%a1
    3. https://blog.csdn.net/qwdafedv/article/details/54691740

    相关文章

      网友评论

          本文标题:Flink项目系列2-实时热门商品统计

          本文链接:https://www.haomeiwen.com/subject/eluyoltx.html