美文网首页Flink大数据
基于FlinkSql1.10.0 搭建实时数仓

基于FlinkSql1.10.0 搭建实时数仓

作者: 卡戎li | 来源:发表于2020-03-15 23:29 被阅读0次

一、数据同步方案

图片.png

二、技术选型

2.1 开源框架与阿里云架构对比

分类 阿里云架构 开源架构
数据采集 DTS、DataHub canal、flume
数据传输 DTS、DataHub kafka
数据存储 Rds、AnalyticDB Mysql、HBase
数据计算 阿里云实时计算 Flink、spark

2.2 Binlog数据采集 Canal

图片.png 图片.png

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)

2.3 Binlog 实时传输 Kafka

图片.png

使用kafka 将canal 读取的binlog数据实时同步到Flink。由于kafka 可以设置多个Group,也可以设置多个partition,故消费时要进行处理,否则有可能出现同表数据进入不同Group,不同partition。故消费时要按照 database + table 将数据进行排序整理,保证同一个表放入同一个分区。

2.4 实时计算 Flink

图片.png

Flink的state管理和checkPoint 机制,可以保证Exactly-Once (同一数据经过多次操作还是同一条)

2.3.1 Flink 批处理和流处理

Flink CoreApis 通过一个底层引擎同时支持流处理和批处理.


图片.png

2.3.2 Flink 如何保证Exactly-Once

图片.png

Flink 可以保存每一个算子的state, 并且通过checkPoint机制对state进行保存。

2.3.3 Flink Table和SQL API

可以通过Flink Table API 和SQL API 进行编程。

  • Table API
    接入数据源之后,可以直接将数据源转化为table, 并进行相应转换。
    //DataSet 转table, 指定字段名
    Table table = fbTableEnv.fromDataSet(ds2, "id,name");
    Table table02 = table.select("name");

/**
 * @author: lipei
 * @Date:2020-03-10 22:03
 */
public class BatchTableDemo {
    public static void main(String[] args) {
        //获取运行环境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

        //创建一个tableEnvironment
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);


        //读取数据源
        DataSet<String> ds1 = fbEnv.readTextFile("src/file/text01.txt");

        //数据转换
        DataSet<Tuple2<String, String>> ds2 = ds1.map(new MapFunction<String, Tuple2<String, String>>() {
            private static final long serialVersionUID = -3027796541526131219L;

            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                String[] splits =  s.split(",");
                return new Tuple2<>(splits[0], splits[1]);
            }
        });

        //DataSet 转table, 指定字段名
        Table table = fbTableEnv.fromDataSet(ds2, "id,name");


        Table table02 = table.select("name");

        //将表转换DataSet
        DataSet<String> ds3  = fbTableEnv.toDataSet(table02, String.class);

        try {
            ds3.print();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

  • SQL API

Flink 支持直接通过SQL操作数据


/**
 * @author: lipei
 * @Date:2020-03-10 22:03
 */
public class StreamSqlDemo {
    public static void main(String[] args) {
        //获取运行环境
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        //创建一个tableEnvironment
        StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

        //读取数据源
        DataStream<String> ds1 = fsEnv.readTextFile("src/file/text01.txt");

        //数据转换
        DataStream<Tuple2<String, String>> ds2 = ds1.map(new MapFunction<String, Tuple2<String, String>>() {
            private static final long serialVersionUID = -3027796541526131219L;

            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                String[] splits =  s.split(",");
                return new Tuple2<>(splits[0], splits[1]);
            }
        });

        //DataStream 转sql, 指定字段名
        Table table = fsTableEnv.fromDataStream(ds2, "id,name");
        table.printSchema();

        //注册为一个表
        fsTableEnv.createTemporaryView("user01", table);

        Table table02 = fsTableEnv.sqlQuery("select * from user01").select("name");

        //将表转换DataStream
        DataStream<String> ds3  = fsTableEnv.toAppendStream(table02, String.class);
        ds3.print();
        try {
            fsEnv.execute("Flink stream sql");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

三、技术实现思路

3.1 需求背景

统计 所有售价小于10的订单明细信息

SELECT
    dajiangtai_orders.orderId,
    dajiangtai_orders.orderNo,
    dajiangtai_orders.userId,
    dajiangtai_orders.goodsId,
    dajiangtai_orders.goodsMoney,
    dajiangtai_goods.goodsName,
    dajiangtai_goods.sellingPrice,
    dajiangtai_orders.realTotalMoney,
    dajiangtai_orders.payFrom,
    dajiangtai_users.phoneNum,
    dajiangtai_users.userName,
    dajiangtai_users.address,
    dajiangtai_orders.createTime AS "orderCreateTime" 
FROM
    dajiangtai_orders 
    LEFT JOIN dajiangtai_goods  ON dajiangtai_orders.goodsId = dajiangtai_goods.goodsId
    LEFT JOIN dajiangtai_users  ON dajiangtai_orders.userId = dajiangtai_users.userId  
    where dajiangtai_goods.sellingPrice < 10
  • 表映射关系
图片.png

3.2 分层设计

  • DWD 层
    目的:获取数据明细表,对数据不进行任何过滤操作
SELECT
    dajiangtai_orders.orderId,
    dajiangtai_orders.orderNo,
    dajiangtai_orders.userId,
    dajiangtai_orders.goodsId,
    dajiangtai_orders.goodsMoney,
    dajiangtai_goods.goodsName,
    dajiangtai_goods.sellingPrice,
    dajiangtai_orders.realTotalMoney,
    dajiangtai_orders.payFrom,
    dajiangtai_users.phoneNum,
    dajiangtai_users.userName,
    dajiangtai_users.address,
    dajiangtai_orders.createTime AS "orderCreateTime" 
FROM
    dajiangtai_orders 
    LEFT JOIN dajiangtai_goods  ON dajiangtai_orders.goodsId = dajiangtai_goods.goodsId
    LEFT JOIN dajiangtai_users  ON dajiangtai_orders.userId = dajiangtai_users.userId  
    where dajiangtai_goods.sellingPrice < 10
  • DM 层
    目的:在DWD 层上,进行字段和条数的过滤
SELECT
     *
FROM
    flink_order_detail
where  sellingPrice < 10
     

3.3 DWD 数据同步

3.3.1 源数据表insert 操作

a、找到受影响的子表Id
b、查询受影响的主表记录信息
c、进行insertOrUpdate操作

INSERT INTO `flink`.`dajiangtai_goods`(`goodsId`, `goodsName`, `sellingPrice`, `goodsStock`, `appraiseNum`) VALUES (1, '2', 20.00, 1, 1);
a、字表  `flink`.`dajiangtai_goods`  受影响的rowKey 为 goodsId, id = 1
b、查询受影响的主表信息
SELECT
    dajiangtai_orders.orderId,
    dajiangtai_orders.orderNo,
    dajiangtai_orders.userId,
    dajiangtai_orders.goodsId,
    dajiangtai_orders.goodsMoney,
    dajiangtai_goods.goodsName,
    dajiangtai_goods.sellingPrice,
    dajiangtai_orders.realTotalMoney,
    dajiangtai_orders.payFrom,
    dajiangtai_users.phoneNum,
    dajiangtai_users.userName,
    dajiangtai_users.address,
    dajiangtai_orders.createTime AS "orderCreateTime" 
FROM
    dajiangtai_orders 
    LEFT JOIN dajiangtai_goods  ON dajiangtai_orders.goodsId = dajiangtai_goods.goodsId
    LEFT JOIN dajiangtai_users  ON dajiangtai_orders.userId = dajiangtai_users.userId  
    where dajiangtai_goods.goodsId in (1)
c、将b查询的记录同步到dwd库

3.3.1 源数据表update 操作

a、找到受影响的字表Id
b、查询受影响的主表记录信息
c、进行insertOrUpdate操作

  • 例如:
update `flink`.`dajiangtai_goods` set sellingPrice = 25 where  `goodsId` = 1;
a、字表  `flink`.`dajiangtai_goods`  受影响的rowKey 为 goodsId, id = 1
b、查询受影响的主表信息
SELECT
    dajiangtai_orders.orderId,
    dajiangtai_orders.orderNo,
    dajiangtai_orders.userId,
    dajiangtai_orders.goodsId,
    dajiangtai_orders.goodsMoney,
    dajiangtai_goods.goodsName,
    dajiangtai_goods.sellingPrice,
    dajiangtai_orders.realTotalMoney,
    dajiangtai_orders.payFrom,
    dajiangtai_users.phoneNum,
    dajiangtai_users.userName,
    dajiangtai_users.address,
    dajiangtai_orders.createTime AS "orderCreateTime" 
FROM
    dajiangtai_orders 
    LEFT JOIN dajiangtai_goods  ON dajiangtai_orders.goodsId = dajiangtai_goods.goodsId
    LEFT JOIN dajiangtai_users  ON dajiangtai_orders.userId = dajiangtai_users.userId  
    where dajiangtai_goods.goodsId in (1)
c、将b查询的记录同步到dwd库

3.4 DW 数据同步
DW 层可以按照3.3 的步骤进行数据同步,直接落库,也可以基于DWD层直接进行where 查询

相关文章

网友评论

    本文标题:基于FlinkSql1.10.0 搭建实时数仓

    本文链接:https://www.haomeiwen.com/subject/unmiehtx.html