美文网首页
flink 学习笔记 — 基于 Flink 实时数仓的简单实践

flink 学习笔记 — 基于 Flink 实时数仓的简单实践

作者: 飞不高的老鸟 | 来源:发表于2019-12-24 17:27 被阅读0次

    回顾

        Flink 因其高吞吐、低延时、有状态、高容错的特性越来越受到数据从业者的青睐,它弥补了Storm、Spark 的很多不足。作为一个实时计算的框架,Flink 在实时数仓中发挥着越来越重要的作用。这里将以简单的案例,实现一个简单的数仓数据流。

    简单分层

        我们知道,在离线数仓建设中,会把仓库结构分为不同的层次来存储不同的数据,离线数仓大体可以分为:ODS层、DWD层、DWS层、数据集市以及应用层。当然,数仓的建设的最终目的是提供稳定高可用的数据应用。这里实时数仓也可类比离线数仓做简单的分层。如下图,实时数仓同样可以分为:ODS层、DWD层、DWS层以及最终提供应用数据给各个服务使用。

    简单架构.png
    • 如上图,通常我们的数据来至服务中的 SDK 打点、不同业务方的业务库、以及服务中的日志文件。这些数据通常通过某种方式发送到 Kafka 中,这些数据就是所说的 ODS 层。当然,这里 mysql 中的数据可以通过 Flink 直接进行读取处理。事实上,mysql 中的数据通常是较小的维表,可以放在 Flink 中的状态中。也可通过 Flink 进行分发到其他地方。
    • ODS 层的数据通过 Flink 计算之后,会被粗粒度的加工成所需要的明细层以及部分维度表。
    • 在明细数据的基础上,通过某种规则计算得到我们需要的汇总层,事实上,所谓汇总层,也就是接近应用层的应用数据。

    简单案例实践

        这里通过一个简单的案例来说明一下整个实时流中的数据走向。需求是通过计算实时更新每个用户在应用中的点击量,然后将计算结果写入 redis 中。

    • 定义用户行为 POJO 类对象:
    package bean;
    
    import java.io.Serializable;
    
    // 用户行为数据
    public class UserAction implements Serializable {
        public String userId;
        public String articleId;
        public String action;
    
    
        public UserAction() {
        }
    
        public UserAction(String userId, String articleId, String action) {
            this.userId = userId;
            this.articleId = articleId;
            this.action = action;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public String getArticleId() {
            return articleId;
        }
    
        public void setArticleId(String articleId) {
            this.articleId = articleId;
        }
    
        public String getAction() {
            return action;
        }
    
        public void setAction(String action) {
            this.action = action;
        }
    
        @Override
        public String toString() {
            return "UserAction{" +
                    "userId='" + userId + '\'' +
                    ", articleId='" + articleId + '\'' +
                    ", action='" + action + '\'' +
                    '}';
        }
    }
    
    • 定义统计结果映射类对象:
    package bean;
    
    import java.io.Serializable;
    
    // 统计结果
    public class ActionStat implements Serializable {
        public String userId;
        public Long count;
    
        public ActionStat() {
        }
    
        public ActionStat(String userId, Long count) {
            this.userId = userId;
            this.count = count;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public Long getCount() {
            return count;
        }
    
        public void setCount(Long count) {
            this.count = count;
        }
    
        @Override
        public String toString() {
            return "ActionStat{" +
                    "userId='" + userId + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
    
    
    • 定义配置,因为这里我们使用到 Kafka 和 Redis,因此只需要添加他们的配置就 ok 了。
    package bean;
    
    public class CONSTANT {
    
        // 连接 kafka 相关信息
        public static String BROKERS = "";
        public static String GROUPID = "";
    
        // 连接 redis
        public static String REDIS_HOST = "";
        public static int REDIS_PORT = 6379;
        public static String PASSWORD = "";
    
    }
    
    
    • OK,在完成以上基础类对象之后,开始我们的程序启动类:
    package task;
    
    import bean.ActionStat;
    import bean.CONSTANT;
    import bean.UserAction;
    import com.alibaba.fastjson.JSONObject;
    import kafka.KafkaConsumer;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import util.MyRedisMapper;
    
    
    // 处理数据并写入到 redis
    public class SinkToRedis {
    
        public static void main(String[] args) throws Exception {
            // 初始化 flink 环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 从 kafka 获取数据
    
            String brokers;
            String groupId;
            String topic;
    
            ParameterTool param = ParameterTool.fromArgs(args);
            if (param.equals(null)) {
                brokers = param.get("brokers");
                groupId = param.get("groupId");
                topic = param.get("topic");
            } else {
                brokers = "";
                groupId = "";
                topic = "";
            }
    
            // 消费 kafka,接入数据源
            DataStream<String> dataStream = env.addSource(KafkaConsumer.consumer(brokers, groupId, topic));
    
            SingleOutputStreamOperator<ActionStat> userStat = dataStream.map(new MyMap())
                    .filter(user -> (user.userId != null && user.articleId != null && "AppClick".equals(user.action)))
                    .keyBy("userId")
                    .timeWindow(Time.milliseconds(5000))
                    .aggregate(new AggDiY());
    
            userStat.print();
    
            // 初始化 redis 配置
            FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                    .setHost(CONSTANT.REDIS_HOST)
                    .setPort(CONSTANT.REDIS_PORT)
                    .setPassword(CONSTANT.PASSWORD)
                    .setDatabase(0)
                    .build();
    
            userStat.addSink(new RedisSink<>(jedisPoolConfig, new MyRedisMapper(RedisCommand.SET)));
    
            env.execute("filnk-test");
        }
    
        // 按 userid 统计
        static class AggDiY implements AggregateFunction<UserAction, ActionStat, ActionStat> {
    
            Long count = 0L;
    
            @Override
            public ActionStat createAccumulator() {
                return new ActionStat();
            }
    
            @Override
            public ActionStat add(UserAction value, ActionStat accumulator) {
                accumulator.userId = value.userId;
                accumulator.count = ++count;
                return accumulator;
            }
    
            @Override
            public ActionStat getResult(ActionStat accumulator) {
                return accumulator;
            }
    
            @Override
            public ActionStat merge(ActionStat a, ActionStat b) {
                a.count = a.count + b.count;
                return a;
            }
        }
    
        // 按 userid 统计
        static class MyMap implements MapFunction<String, UserAction> {
    
            @Override
            public UserAction map(String value) throws Exception {
    
                JSONObject jsonObject = JSONObject.parseObject(value);
                JSONObject content = JSONObject.parseObject(jsonObject.getString("content"));
                if (jsonObject.getString("content") != null) {
                    JSONObject properties = JSONObject.parseObject(content.getString("properties"));
                    String userId = properties.getString("userId");
                    String articleId = properties.getString("article_id");
                    String action = content.getString("event");
    
                    UserAction us = new UserAction(userId, articleId, action);
                    return us;
                }
                return null;
            }
        }
    }
    

        这里简单实现了从数据 ODS 层(也即是 Kafka)获取数据,通过 Flink 计算处理,将想要的结果最终写入 Redis 中供其他应用使用。当然,这里没有涉及到复杂的明细表以及维表的关联等操作。后续会不断的进行完善。

    小结

        实时数仓是目前 Flink 应用较多的场景,理解整个数据流的走向是建设实时数仓的基础。在我们不断的使用中,会逐渐的用到明细表、维表,同时也会用到较为难懂的状态。当然,随着开发者能力的不断提升,完成这些亦会逐渐的得心应手。

    相关文章

      网友评论

          本文标题:flink 学习笔记 — 基于 Flink 实时数仓的简单实践

          本文链接:https://www.haomeiwen.com/subject/dhufoctx.html