美文网首页
Flink项目系列3-实时流量统计

Flink项目系列3-实时流量统计

作者: 只是甲 | 来源:发表于2021-11-12 14:55 被阅读0次

    一.项目概述

    1.1 模块创建和数据准备

      新建一个NetworkFlowAnalysis的package。

      将 apache 服务器的日志文件 apache.log 复制到资源文件目录 src/main/resources
    下,我们将从这里读取数据。

      当然, 我们也可以仍然用 UserBehavior.csv 作为数据源, 这时我们分析的就不 是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”) 操作。

    1.2 基于服务器 log 的热门页面浏览量统计

      我们现在要实现的模块是 “ 实时流量统计”。对于一个电商平台而言,用户登 录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以 简单地从 web 服务器的日志中提取出来。

      我们在这里先实现“ 热门页面浏览数” 的统计, 也就是读取服务器日志中的每 一行 log, 统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示。

      具体做法为: 每隔 5 秒, 输出最近 10 分钟内访问量最多的前 N 个 URL。 可以 看出,这个需求与之前“实时热门商品统计” 非常类似,所以我们完全可以借鉴此 前的代码。

      在 NetworkFlowAnalysis 下创建 NetworkFlow 类,在 beans 下 定 义 POJO 类 ApacheLogEvent,这是输入的日志数据流;另外还有 UrlViewCount,这是窗口操作 统计的输出数据类型。在 main 函数中创建 StreamExecutionEnvironment 并做配置, 然后从 apache.log 文件中读取数据, 并包装成 ApacheLogEvent 类型。

      需要注意的是, 原始日志中的时间是“ dd/MM/yyyy:HH:mm:ss” 的形式, 需要 定义一个 DateTimeFormat 将其转换为我们需要的时间戳格式:

    .map( line -> {
    String[] fields = line.split(" "); SimpleDateFormat simpleDateFormat = new
    SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
    Long timestamp = simpleDateFormat.parse(fields[3]).getTime();
    
    return new ApacheLogEvent(fields[0], fields[1], timestamp, fields[5], fields[6]);
    } )
    

    二.pom文件配置

    pom文件如下:

    </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.1</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.1</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-core</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-redis_2.11</artifactId>
          <version>1.1.5</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>8.0.19</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <!-- Table API 和 Flink SQL -->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-java-bridge_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-common</artifactId>
          <version>1.10.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-csv</artifactId>
          <version>1.10.1</version>
        </dependency>
    

    三.代码

    3.1 POJO类

    ApacheLogEvent

    package com.zqs.flink.project.networkflowanalysis.beans;
    
    public class ApacheLogEvent {
        private String ip;
        private String userId;
        private Long timestamp;
        private String method;
        private String url;
    
        public ApacheLogEvent(){
        }
    
        public ApacheLogEvent(String ip, String userId, Long timestamp, String method, String url) {
            this.ip = ip;
            this.userId = userId;
            this.timestamp = timestamp;
            this.method = method;
            this.url = url;
        }
    
        public String getIp() {
            return ip;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public Long getTimestamp() {
            return timestamp;
        }
    
        public String getMethod() {
            return method;
        }
    
        public String getUrl() {
            return url;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public void setTimestamp(Long timestamp) {
            this.timestamp = timestamp;
        }
    
        public void setMethod(String method) {
            this.method = method;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        @Override
        public String toString() {
            return "ApacheLogEvent{" +
                    "ip='" + ip + '\'' +
                    ", userId='" + userId + '\'' +
                    ", timestamp=" + timestamp +
                    ", method='" + method + '\'' +
                    ", url='" + url + '\'' +
                    '}';
        }
    }
    

    PageViewCount

    package com.zqs.flink.project.networkflowanalysis.beans;
    
    public class PageViewCount {
        private String url;
        private Long windowEnd;
        private Long count;
    
        public PageViewCount(){
    
        }
    
        public PageViewCount(String url, Long windowEnd, Long count) {
            this.url = url;
            this.windowEnd = windowEnd;
            this.count = count;
        }
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public Long getWindowEnd() {
            return windowEnd;
        }
    
        public void setWindowEnd(Long windowEnd) {
            this.windowEnd = windowEnd;
        }
    
        public Long getCount() {
            return count;
        }
    
        public void setCount(Long count) {
            this.count = count;
        }
    
        @Override
        public String toString() {
            return "PageViewCount{" +
                    "url='" + url + '\'' +
                    ", windowEnd=" + windowEnd +
                    ", count=" + count +
                    '}';
        }
    }
    

    UserBehavior

    package com.zqs.flink.project.networkflowanalysis.beans;
    
    public class UserBehavior {
        // 定义私有属性
        private Long userId;
        private Long itemId;
        private Integer categoryId;
        private String behavior;
        private Long timestamp;
    
        public UserBehavior() {
        }
    
        public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.categoryId = categoryId;
            this.behavior = behavior;
            this.timestamp = timestamp;
        }
    
        public Long getUserId() {
            return userId;
        }
    
        public void setUserId(Long userId) {
            this.userId = userId;
        }
    
        public Long getItemId() {
            return itemId;
        }
    
        public void setItemId(Long itemId) {
            this.itemId = itemId;
        }
    
        public Integer getCategoryId() {
            return categoryId;
        }
    
        public void setCategoryId(Integer categoryId) {
            this.categoryId = categoryId;
        }
    
        public String getBehavior() {
            return behavior;
        }
    
        public void setBehavior(String behavior) {
            this.behavior = behavior;
        }
    
        public Long getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(Long timestamp) {
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId=" + userId +
                    ", itemId=" + itemId +
                    ", categoryId=" + categoryId +
                    ", behavior='" + behavior + '\'' +
                    ", timestamp=" + timestamp +
                    '}';
        }
    }
    

    3.2 热门页面

    代码:
    HotPages

    package com.zqs.flink.project.networkflowanalysis;
    
    import akka.protobuf.ByteString;
    import com.zqs.flink.project.networkflowanalysis.beans.ApacheLogEvent;
    import com.zqs.flink.project.networkflowanalysis.beans.PageViewCount;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.net.URL;
    import java.sql.Timestamp;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    /**
     * @author  只是甲
     * @date    2021-10-18
     * @remark  热门页面
     */
    
    public class HotPages {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            //读取文件
            URL resource = HotPages.class.getResource("/apache.log");
            DataStream<String> inputStream = env.readTextFile(resource.getPath());
    
            DataStream<ApacheLogEvent> dataStream = inputStream
                    .map(line -> {
                        String[] fields = line.split(" ");
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                        Long timestamp = simpleDateFormat.parse(fields[3]).getTime();
                        return new ApacheLogEvent(fields[0], fields[1], timestamp, fields[5], fields[6]);
                    })
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) {
                        @Override
                        public long extractTimestamp(ApacheLogEvent element) {
                            return element.getTimestamp();
                        }
                    });
    
            dataStream.print("data");
    
            // 分组开窗聚合
    
            // 定义一个侧输出流标签
            OutputTag<ApacheLogEvent> lateTag = new OutputTag<ApacheLogEvent>("late"){};
    
            SingleOutputStreamOperator<PageViewCount> windowAggStream = dataStream
                    .filter(data -> "GET".equals(data.getMethod()))     // 过滤get请求
                    .filter(data -> {
                        String regex = "^((?!\\.(css|js|png|ico)$).)*$";
                        return Pattern.matches(regex, data.getUrl());
                    })
                    .keyBy(ApacheLogEvent:: getUrl)     //  按照url分组
                    .timeWindow(Time.minutes(10), Time.seconds(5))
                    .allowedLateness(Time.minutes(1))
                    .sideOutputLateData(lateTag)
                    .aggregate(new PageCountAgg(), new PageCountResult());
    
            windowAggStream.print("agg");
            windowAggStream.getSideOutput(lateTag).print("late");
    
            // 收集同一窗口count数据,排序输出
            DataStream<String> resultStream = windowAggStream
                    .keyBy(PageViewCount::getWindowEnd)
                    .process(new TopNHotPages(3));
    
            resultStream.print();
    
            env.execute("hot pages job");
        }
    
        // 自定义聚合函数
        public static class PageCountAgg implements AggregateFunction<ApacheLogEvent, Long, Long> {
    
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(ApacheLogEvent value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return a + b;
            }
        }
    
        // 实现自定义的窗口函数
        public static class PageCountResult implements WindowFunction<Long, PageViewCount, String, TimeWindow>{
    
            @Override
            public void apply(String url, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
                out.collect(new PageViewCount(url, window.getEnd(), input.iterator().next() ));
            }
        }
    
        // 实现自定义的处理函数
        public static class TopNHotPages extends KeyedProcessFunction<Long, PageViewCount, String>{
            private Integer topSize;
    
            public TopNHotPages(Integer topSize){
                this.topSize = topSize;
            }
    
            // 定义状态,保存当前所有pageViewCount到Map中
            MapState<String, Long> pageViewCountMapState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                pageViewCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("page-count-map", String.class, Long.class));
            }
    
            @Override
            public void processElement(PageViewCount value, Context ctx, Collector<String> out) throws Exception {
                pageViewCountMapState.put(value.getUrl(), value.getCount());
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
                // 注册一个1分钟之后的定时器,用来清空状态
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 60 + 1000L);
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                // 先判断是否到了窗口关闭清理时间,如果是,直接清空状态返回
                if ( timestamp == ctx.getCurrentKey() + 60 * 1000L ){
                    pageViewCountMapState.clear();
                    return;
                }
    
                ArrayList<Map.Entry<String, Long>> pageViewCounts = Lists.newArrayList(pageViewCountMapState.entries());
    
                pageViewCounts.sort(new Comparator<Map.Entry<String, Long>>() {
                    @Override
                    public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
                        if(o1.getValue() > o2.getValue())
                            return -1;
                        else if(o1.getValue() < o2.getValue())
                            return 1;
                        else
                            return 0;
                    }
                });
    
                // 格式化成String输出
                StringBuilder resultBuilder = new StringBuilder();
                resultBuilder.append("=================================================\n");
                resultBuilder.append("窗口结束时间:").append(new Timestamp(timestamp -1)).append("\n");
    
                // 遍历列表,取top n输出
                for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++){
                    Map.Entry<String, Long> currentItemViewCount = pageViewCounts.get(i);
                    resultBuilder.append("NO ").append(i + 1).append(":")
                            .append(" 页面URL = ").append(currentItemViewCount.getKey())
                            .append(" 浏览量 = ").append(currentItemViewCount.getValue())
                            .append("\n");
                }
                resultBuilder.append("======================================\n\n");
    
                // 控制输出频率
                Thread.sleep(1000L);
    
                out.collect(resultBuilder.toString());
            }
    
    
        }
    
    }
    

    测试记录:

    image.png

    3.3 页面访问量

    代码:
    PageView

    package com.zqs.flink.project.networkflowanalysis;
    
    import com.zqs.flink.project.networkflowanalysis.beans.UserBehavior;
    import com.zqs.flink.project.networkflowanalysis.beans.PageViewCount;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.net.URL;
    import java.util.Random;
    
    /**
     * @author  只是甲
     * @date    2021-10-18
     * @remark  page view 统计
     */
    
    public class PageView {
        public static void main(String[] args) throws Exception{
            // 1.创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 2. 读取数据, 创建DataStream
            URL resource = PageView.class.getResource("/UserBehavior.csv");
            DataStream<String> inputStream = env.readTextFile(resource.getPath());
    
            // 3. 转换为POJO, 分配时间戳和watermark
            DataStream<UserBehavior> dataStream = inputStream
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                    })
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                        @Override
                        public long extractAscendingTimestamp(UserBehavior element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 4. 分组开窗聚合,得到每个窗口内各个商品的count值
            SingleOutputStreamOperator<Tuple2<String, Long>> pvResultStream0 =
                    dataStream
                    .filter(data -> "pv".equals(data.getBehavior()))        //  过滤pv行为
                    .map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                            return new Tuple2<>("pv", 1L);
                        }
                    })
                    .keyBy(0)   //  按商品分组
                    .timeWindow(Time.hours(1))      // 开1小时滚动窗口
                    .sum(1);
    
            // 并行任务改进, 设计随机key,解决数据倾斜问题
            SingleOutputStreamOperator<PageViewCount> pvStream = dataStream.filter(data -> "pv".equals(data.getBehavior()))
                    .map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
                        @Override
                        public Tuple2<Integer, Long>  map(UserBehavior value) throws Exception {
                            Random random = new Random();
                            return new Tuple2<>(random.nextInt(10), 1L);
                        }
                    })
                    .keyBy(data -> data.f0)
                    .timeWindow(Time.hours(1))
                    .aggregate(new PvCountAgg(), new PvCountResult());
    
            // 将各分区数据汇总起来
            DataStream<PageViewCount> pvResultStream = pvStream
                    .keyBy(PageViewCount::getWindowEnd)
                    .process(new TotalPvCount());
    
            pvResultStream.print();
    
            env.execute("pv count job");
        }
    
        // 实现自定义预聚合函数
        public static class PvCountAgg implements AggregateFunction<Tuple2<Integer, Long>, Long, Long>{
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(Tuple2<Integer, Long> value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return a + b;
            }
        }
    
        // 实现自定义窗口
        public static class PvCountResult implements WindowFunction<Long, PageViewCount, Integer, TimeWindow>{
            @Override
            public void apply(Integer integer, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
                out.collect( new PageViewCount(integer.toString(), window.getEnd(), input.iterator().next()));
            }
        }
    
        //  实现自定义处理函数,把相同窗口分组统计的count值叠加
        public static class TotalPvCount extends KeyedProcessFunction<Long, PageViewCount, PageViewCount>{
            // 定义状态, 保存当前的总Count值
            ValueState<Long> totalCountState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                totalCountState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("total-count", Long.class, 0L));
            }
    
            @Override
            public void processElement(PageViewCount value, Context ctx, Collector<PageViewCount> out) throws Exception {
                totalCountState.update( totalCountState.value() + value.getCount() );
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<PageViewCount> out) throws Exception {
                // 定时器出发, 所有分组count值都到齐, 直接输出当前的总count值
                Long totalCount = totalCountState.value();
                out.collect(new PageViewCount("pv", ctx.getCurrentKey(), totalCount));
                // 清空状态
                totalCountState.clear();
            }
        }
    }
    
    

    测试记录:

    image.png

    3.4 页面独立访问量

    代码:
    UniqueVisitor

    package com.zqs.flink.project.networkflowanalysis;
    
    /**
     * @author  只是甲
     * @date    2021-10-18
     * @remark  unique page view 统计
     */
    
    import com.zqs.flink.project.networkflowanalysis.beans.UserBehavior;
    import com.zqs.flink.project.networkflowanalysis.beans.PageViewCount;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.net.URL;
    import java.util.HashSet;
    
    public class UniqueVisitor {
        public static void main(String[] args) throws Exception {
            // 1. 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 2. 读取数据, 创建DataStream
            URL resource = UniqueVisitor.class.getResource("/UserBehavior.csv");
            DataStream<String> inputStream = env.readTextFile(resource.getPath());
    
            // 3. 转换为POJO, 分配时间戳和watermark
            DataStream<UserBehavior> dataStream = inputStream
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                    })
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                        @Override
                        public long extractAscendingTimestamp(UserBehavior element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 开窗统计uv值
            SingleOutputStreamOperator<PageViewCount> uvStream = dataStream.filter(data -> "pv".equals(data.getBehavior()))
                    .timeWindowAll(Time.hours(1))
                    .apply(new UvCountResult());
    
            uvStream.print();
    
            env.execute("uv count job");
        }
    
        // 实现自定义全窗口函数
        public static class UvCountResult implements AllWindowFunction<UserBehavior, PageViewCount, TimeWindow>{
            @Override
            public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<PageViewCount> out) throws Exception {
                // 定义一个Set结构,保存窗口中所有的userId,自动去重
                HashSet<Long> uidSet = new HashSet<>();
                for (UserBehavior ub: values)
                    uidSet.add(ub.getUserId());
                out.collect( new PageViewCount("uv", window.getEnd(), (long)uidSet.size()));
            }
        }
    }
    

    测试记录:

    image.png

    3.5 布隆过滤器实现独立访问量

    代码:
    UvWithBloomFilter

    package com.zqs.flink.project.networkflowanalysis;
    
    /**
     * @author  只是甲
     * @date    2021-10-18
     * @remark  unique page view 布隆过滤器
     */
    
    
    import com.zqs.flink.project.networkflowanalysis.beans.UserBehavior;
    import com.zqs.flink.project.networkflowanalysis.beans.PageViewCount;
    // import kafka.server.DynamicConfig;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import redis.clients.jedis.Jedis;
    
    import java.net.URL;
    
    
    public class UvWithBloomFilter {
        public static void main(String[] args) throws Exception {
            // 1. 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 2. 读取数据,创建DataStream
            URL resource = UniqueVisitor.class.getResource("/UserBehavior.csv");
            DataStream<String> inputStream = env.readTextFile(resource.getPath());
    
            // 3. 转换为POJO,分配时间戳和watermark
            DataStream<UserBehavior> dataStream = inputStream
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                    })
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                        @Override
                        public long extractAscendingTimestamp(UserBehavior element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 开窗统计uv值
            SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
                    .filter(data -> "pv".equals(data.getBehavior()))
                    .timeWindowAll(Time.hours(1))
                    .trigger( new MyTrigger() )
                    .process( new UvCountResultWithBloomFliter() );
    
            uvStream.print();
    
            env.execute("uv count with bloom filter job");
        }
    
        // 自定义触发器
        public static class MyTrigger extends Trigger<UserBehavior, TimeWindow>{
            @Override
            public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                // 每一条数据来到, 直接触发窗口计算,并且直接清空窗口
                return TriggerResult.FIRE_AND_PURGE;
            }
    
            @Override
            public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    
            }
        }
    
        // 自定义一个布隆过滤器
        public static class MyBloomFilter {
            // 定义位图的大小,一般需要定义为2的整次幂
            private Integer cap;
    
            public MyBloomFilter(Integer cap){
                this.cap = cap;
            }
    
            // 实现一个hash函数
            public Long hashCode(String value, Integer seed){
                Long result = 0l;
                for (int i = 0; i < value.length(); i++){
                    result = result * seed + value.charAt(i);
                }
                return result & (cap - 1);
            }
        }
    
        // 实现自定义的处理函数
        public static class UvCountResultWithBloomFliter extends ProcessAllWindowFunction<UserBehavior, PageViewCount, TimeWindow>{
            // 定义jedis连接和布隆过滤器
            Jedis jedis;
            MyBloomFilter myBloomFilter;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                jedis = new Jedis("10.31.1.122", 6379);
                myBloomFilter = new MyBloomFilter(1 << 29);     // 要处理1亿个数据,用64MB大小的位图
            }
    
            @Override
            public void process(Context context, Iterable<UserBehavior> elements, Collector<PageViewCount> out) throws Exception {
                // 将位图和窗口count值全部存入redis,用windowEnd作为key
                Long windowEnd = context.window().getEnd();
                String bitmapKey = windowEnd.toString();
                // 把count值存成一张hash表
                String countHashName = "uv_count";
                String countKey = windowEnd.toString();
    
                // 1. 取当前的userId
                Long userId = elements.iterator().next().getUserId();
    
                // 2. 计算位图中的offset
                Long offset = myBloomFilter.hashCode(userId.toString(), 61);
    
                // 3. 用redis的getbit命令,判断对应位置的值
                Boolean isExist = jedis.getbit(bitmapKey, offset);
    
                if ( !isExist ){
                    // 如果不存在,对应位图的位置置1
                    jedis.setbit(bitmapKey, offset, true);
    
                    // 更新redis中保存的count值
                    Long uvCount = 0L;  // 初始count值
                    String uvCountString = jedis.hget(countHashName, countKey);
                    if ( uvCountString != null && !"".equals(uvCountString) )
                        uvCount = Long.valueOf(uvCountString);
                    jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1));
    
                    out.collect(new PageViewCount("uv", windowEnd, uvCount + 1));
                }
    
            }
    
            @Override
            public void close() throws Exception {
                super.close();
            }
        }
    
    }
    

    测试记录:

    image.png

    参考:

    1. https://www.bilibili.com/video/BV1qy4y1q728
    2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_1432-%e5%ae%9e%e6%97%b6%e6%b5%81%e9%87%8f%e7%bb%9f%e8%ae%a1%e7%83%ad%e9%97%a8%e9%a1%b5%e9%9d%a2

    相关文章

      网友评论

          本文标题:Flink项目系列3-实时流量统计

          本文链接:https://www.haomeiwen.com/subject/uluyoltx.html