美文网首页
2020-11-29 实时同步

2020-11-29 实时同步

作者: LancerLin_LX | 来源:发表于2020-12-08 09:49 被阅读0次

    背景

    数据源进入数据仓库,需要一步ETL操作,传统通过离线的方式,将前一天T+1的数据导入到ODS层中。但是随着公司业务不断发展,数据量不断增加,这种T+1的导入方式,耗时越来越长,任务的稳定性越来越差,账单表2019年11月的增量数据为6000万/天,2020年11月的增量数据为1.8亿/天,增长了3倍,按原来的方式,离线拉取数据需要2 - 3小时,如果失败了,需要重新拉取。重新拉取需要2 - 3小时,后置任务都需要等待无法执行,集群资源空闲,当账单表任务完成后,所有的后置任务都启动了,扎堆运行,会导致资源负载非常高。账单表任务的延迟还会导致当天的任务都会延迟2~3小时,相关报表延迟展示,所有的分析、运营人员需要的数据也会延迟,相关分析人员可能要到下午才能开始正常分析、挖掘。
    我们一年的KPI失败次数是12次,而5月份,已经失败了6次,每次延迟或者失败,都会有凌晨4点的告警电话,值班人员起床手动重跑。

    image.png
    • 下图是集群资源使用的监控信息
    • 离线导入数据时间长,核心宽表任务没有完成,后置任务无法启动,集群无法工作,资源使用率低,任务无法使用,0~3点集群利用率低, CPU平均使用率38%内存 平均使用率 50%
    • 3点到14点之间,大量任务运行,内存平均使用率98%CPU平均使用率75%,如果能够将0~3点这个时间段利用起来,任务可以提前3小时完成
      image.png

    为了提高核心宽表稳定性、降低耗时、提高集群资源利用率,需要对原来的方案进行优化。

    方案

    目标

    利用实时计算实现数据实时同步,降低离线采集时间,提高任务稳定性,从而提高集群利用率。用小吃多餐的方式,分担离线一次性导入数据的性能瓶颈,该方案只适用于T+1增量采集的数据表。

    数据流

    image.png
    • 原来离线采集的方式,需要将数据通过网络,拉取到本地,再上传到集群里的spark表,数据量大的时候,不仅拉取上传的时间长,还会导致客户端机器负载非常高,容易出现异常
    • 通过玄武实时采集的方式同步到kafka,再通过实时任务将数据实时同步到spark表。将一整天的数据,拆分到每分每秒,分而治之。当前一天的数据没有延迟,就可以直接使用,不需要原来离线采集的拉取跟上传操作

    简单例子

    image.png
    • 首先我们的目的就是需要将DB表的数据导入到spark对应的一张表里
    • DB表只有3条数据,将离线同步数据的操作,转化为对流水数据的全量同步后,流水数据会有6条操作数据,对应spark表也会有6条数据
    • 最后将spark表的6条数据,按一定的逻辑去重,这里的逻辑主键为id+operType,按照这2个组成的规则去重后,得到的数据,就是跟DB表的数据一致

    具体实现

    mysql表

    比如mysql表是以下5个字段的数据

    Findex Forder_id Fversion Fcreate_time Fmodify_time
    17106 O20201208505953012260 2 20-12-08 15:10:07 2020-12-08 15:12:25

    kafka数据

    {
        "headers":{
            "data_size":"254",
            "log_file_offset":"813159285",
            "database_name":"fund_credit_trade_each_26_db",
            "pipeline_time_consuming":"204",
            "log_file_name":"binlog.000040",
            "db_move_version":"0",
            "table_name":"t_trade_order_amount_0",
            "start_time":"2020-12-08 15:12:00",
            "event_execute_time":"1607411545000",
            "hash_code":"5",
            "record_oper_type":"U",
            "exec_time":"2020-12-08 15:12:25",
            "data_table_id":"480",
            "monitor_topic":"C8_JG_OTTER_FundcredittradeDB_fund_credit_trade_each_--_db.t_trade_order_amount_-_topic",
            "murmur_hash_code":"1949271285",
            "pipeline_time_consuming_without_canal":"4",
            "schema_table_name":"fund_credit_trade_each_--_db.t_trade_order_amount_-"
        },
        "body":{
            "Findex":"17106",
            "Forder_id":"O20201208505953012260",
            "Fversion":"2",
            "Fcreate_time":"2020-12-08 15:10:07",
            "Fmodify_time":"2020-12-08 15:12:25"
        }
    }
    
    
    • 玄武实时采集的时候,将mysql表的数据,存放到body里面,用K-V的json格式,跟原表数据保持原样,然后再headers会增加许多元数据信息,如binlog信息、库表信息等。

    spark表

    相比原来的离线采集的方式,实时采集的数据是binlog流水日志,spark表不支持update操作,为了数据的最终一致性,需要增加一些逻辑字段,方便对数据进行去重操作。除了保留DB表的所有字段,还会增加以下字段。

    字段名 对应字段 描述
    fetl_time 数据的写入spark表时间
    fencrypt_version 脱敏数据版本
    f_etl_offset kafka 中数据的offset
    fetl_version SDK写入的版本号
    f_etl_db_name headers 中的database_name 数据从属的库名
    f_etl_table_name headers 中的table_name 数据从属的表名
    f_etl_op_type headers 中的record_oper_type 操作类型 string,值: “U”,“I”,“D”,分别代表 update,insert,delete
    fetl_event_execute_time headers 中的event_execute_time binlog event时间
    fetl_db_move_version headers 中的db_move_version 重置位点次数/位点版本
    fetl_binlog_name headers 中的log_file_name,去掉前面7个字符 binlog文件ID
    fetl_binlog_position headers 中的log_file_offset binlog位点

    spark表去重逻辑

    (
        select o.*,
            row_number() over(
                partition by f_etl_db_name,
                f_etl_table_name,
                fid (相关主键,可以是orderid,findex...)
                order by 
                    fetl_binlog_name desc,
                    fetl_binlog_position desc,
                        case
                        when f_etl_op_type = 'D' then 1
                        when f_etl_op_type = 'U' then 2
                        when f_etl_op_type = 'I' then 3
                        else 4
                    end
            ) as frank
        from rt_ods.dspostloandb_postloan_xx_db_t_user_repay_info_y o
        WHERE f_p_date = '2020-12-07'
    ) t
    where frank = 1
        and f_etl_op_type != 'D'
    
    
    image.png
    • 逻辑就是通过binlog数据,找到最新的一条数据,我们用一个例子说明
    • 首先我们知道fetl_binlog_name、 fetl_binlog_position 都是递增的,fetl_binlog_name binlog文件名,越大说明数据越新,fetl_binlog_position 位点信息,越大代表数据越新,
    • 在mysql中orderId O20201215897453114900,在2020-12-05 08:59:02.0新建后(查看表中的f_etl_op_type=I),更新了7次(查看表中的f_etl_op_type=U),一共是8条binlog日志,最后的数据是图中的第一条
    • 在Spark表中将完整记录这8条数据(1条Insert, 7条Update),为了让Spark表 8条数据去重后跟 mysql orderId O202012158974531149001条数据数据一致,需要对这8条数据去重,逻辑就是上面的Sql。
    • 利用partition by找到同个库同个表以及该表的主键的所有数据,按照fetl_binlog_name desc,fetl_binlog_position desc,并且操作流水不为'D'(删除)的数据,就是找到最新的且不是删除的一条数据,就是2020-12-05 09:01:55.0这条数据,对应mysql最新的数据。

    实时采集主要遇到的问题

    使用要求

    • 实时同步只适合增量任务
    • 业务方需要按照DBA规范:更新数据的时候需要更新Fmodify_time

    表名生成规则

    instanceName.toLowerCase()+"_"+dbName.replace("#", "").toLowerCase()+"_"+tableName.replace("#", "").toLowerCase()
    
    
    • 数据源支持:分库分表、百库十表、单库单表,实时同步到相应的1张Spark表
    • 实例名:WateruserinfoDB
    • 库名:user_info_water_fql_#xx#_#yy#_db
    • 表名:t_user_info_water_fql_#z#
    • 生成的实时表:rt_ods.wateruserinfodb_user_info_water_fql_xx_yy_db_t_user_info_water_fql_z

    数据吞吐量

    • 目前接入256实例,28000+个表,每日处理量54000/秒,11MB/秒

    数据准确性

    • 通过binlog数据完整性来实现数据准确性,发送端保证数据不丢失,消费端保证数据不漏消费
    • 如果binlog文件过大,导致磁盘容量不足,为了保证系统正常,binlog文件会被删除文件被删除的话。采集来不及采集就会导致数据就缺失,最终导致Spark表数据缺失。所以通过增加 f_etl_offset、fetl_event_execute_time、fetl_db_move_version、fetl_binlog_name bigint、fetl_binlog_position bigint 4个字段,来判断binlog是否完整,如果不完整,则走原来的离线同步方式

    数据可用性

    • 实时消费的时候,将表的最大时间记录起来,通过查记录跟当前时间比较,判断是否延迟同步来判断Spark表数据是否可用

    spark表数据去重

    每一条数据都有唯一键,唯一键就是标识是唯一一条数据的,加上我们增加的附加字段组合作为去重的逻辑。可以完美解决解决的是数据重复问题。

    效果

    当天数据已经同步好,不在需要离线同步的时间,只需要将数据按规则去重后,便可以使用,减少了数据导入的时间,大大提高集群利用率以及任务完成率。


    image.png
    • 减少账单表数据导入时间1.5 ~ 3小时
    • 核心任务9点前完成率从60%提升到 85% 提升25%
    • 宽表任务3点前完成率从27%提升到95% 提升66%
    • 0~3点资源使用率 内存88%,提升21%,CPU 59%,提升14%
    • 上线后4个月宽表异常0次,管理组同事无需4点起床处理

    运营

    • 目前用于数据仓库重要任务日采集成功数620-780,成功率82%~91%
    • 跟踪每日实时同步情况,对异常问题,进行定位分析,解决项目中的bug,提高实时同步任务成功率跟成功数
    image.png image.png

    总结

    实时同步利用了大数据解决方案的架构设计的解决思路、设计思想 -- 分而治之。
    其实生活中这样的例子无处不在,例如让你扛一袋沙子,我一次扛不动,那么我拿个小桶,分开一桶一桶的搬,这其实就是分而治之的思路。

    海量数据处理的分而治之

    所谓海量数据处理,其实很简单,海量,海量,何谓海量,就是数据量太大,所以导致要么是无法在较短时间内迅速解决,要么是数据太大,导致无法一次性装入内存。
    那解决办法呢?针对空间,无非就一个办法:大而化小:分而治之/hash映射,就把规模大化为规模小的。

    • 一道常见的面试题
      给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
      假如每个url大小为10bytes,那么可以估计每个文件的大小为50G×64=320G,远远大于内存限制的4G,所以不可能将其完全加载到内存中处理,可以采用分治的思想来解决。

    大数据框架设计的分而治之

    分布式存储HDFS中,采用将大文件切片的方式,比如一个1G的文件,切分成8个128MB的文件,在分布式计算框架中,不管是MR引擎,还是Spark计算引擎,利用的都是这个原理,大文件处理慢,就切分成多个小文件,各个处理,提高并行度。
    Kafka中的topic拥有多个partition也是这种思想,通过提高parition数量提升吞吐量。
    分而治之还有JDK7中的Fork/Join框架,是一个并行执行任务的框架。原理:把大任务割成若干小任务,最终汇总小任务的结果得到大任务结果的框架,可以理解成一个单机版的MR/Spark计算框架。

    image.png

    架构设计的分而治之

    image.png

    利用分而治之的思想,在实时同步的架构中实现。从上图可见,我们将数据读取T+1修改为从mysql binlog的方式去获取数据,采用玄武进行binlog日志的获取以及解析。这其实也是一个拆分的思想,从原先的数据拉到客户端机器,一次性处理,拆分成数据端-DB-玄武-Kafka,将一整天的数据,拆分到每分每秒处理,分而治之。过程虽然变长了,但是实时性跟稳定性大幅度提升。
    在理想情况下,我们在不清楚业务系统需求的时候,设计出来的架构跟具体业务系统的架构是不吻合的。当业务系统无法提供你所要的需求的时候,从不同的层面去思考,将业务进行拆分,或许会给你不一样的解决方案。

    参考链接:
    https://zhuanlan.zhihu.com/p/55869671
    https://baijiahao.baidu.com/s?id=1649619201425234676&wfr=spider&for=pc

    相关文章

      网友评论

          本文标题:2020-11-29 实时同步

          本文链接:https://www.haomeiwen.com/subject/xjbkwktx.html