美文网首页
mysql同步数据到hive---binlog方式

mysql同步数据到hive---binlog方式

作者: cyangssrs | 来源:发表于2020-07-08 14:58 被阅读0次

    概述

    mysql同步数据到hive大部分公司目前都是走的jdbc的方式。

    这种方式有两个好处:

    1. 开发简单。只需要从mysql读取相关的数据,插入到hive表当中就行了。Sqoop 或者 waterdrop 都是走的这个逻辑。
    2. 稳定。因为是直接读取mysql的jdbc的数据,基本不会有数据不一致的问题。

    也有不好的地方:

    1. 对数据库的压力比较大。特别是每小时全量同步一些大的表的任务,都走jdbc的话,mysql的压力会比较大。

    binlog同步mysql数据技术方案

    总体架构设计

    1. gorail将mysql的binlog数据同步到kafka
    2. flink消费kafka的数据写入到hdfs
    3. spark批处理合并binlog增量数据和历史数据,生成最新的数据快照(这一步比较复杂,后面详细说明)

    具体细节

    gorail将mysql的binlog数据同步到kafka

    这一步最主要的细节是将mysql库的所有binlog数据全部打入一个kafka topic,格式使用json。格式如下:

    {
        "action":"insert",//update、insert、delete
        "gorail_nano":"1578131519599439478",//gorail处理时间
        "primary_keys":"[[610927]]",//主键
        "raw_rows":"[]",//更新前数据
        "rows":"[map[action:1 content_id:1600040219 create_time:2020-01-04 17:51:59 id:610927 op_id:472 op_name:张玲玲]]",//更新后数据
        "schema":"qukan",//库名
        "table":"content_op_log",//表名
        "timestamp":"1578131519"//binlog产生时间
    }
    
    1. 为什么选择json?
      因为json结构比较灵活,每个mysql的表的数据结构都不一样,json会更好记录这些变化的数据。
    2. 为什么打入到一个kafka topic?
      这个主要是因为方便,后面消费的时候也只需要起一个flink任务,便于统一管理。

    flink消费kafka的数据写入到hdfs

    这一步的主要的细节在于写入到hdfs的结构,以及为什么不直接写入hive。
    不写入到hive表的原因在于,binlog的数据结构是不固定的,而hive的结构相对是比较固定的。如果要写入到hive的话,就需要将不同的表的binlog写入到不同的hive表中,这个维护成本太高了。而且spark其实可以直接读取hdfs的json文件,因此直接放hdfs就好了。

    写入到hdfs的话,考虑到后续读这个数据是要按照表去读增量数据,所以写入的目录一定是要带日期和表名称的。我这边用的目录结构是这样的:

    ${binlog_file_path}/schema=${mysql_db_name}/table=${mysql_table_name}/dt=${date}/${json_file}
    

    也就是说要在flink根据数据所属的db、table_name、和日期将数据写入到不同的目录里。

    在这一步的处理的过程中遇到了一些比较重要的参数问题。

    1. 首先是选择Rolling Policy。flink提供了两种rolling policy,分别是DefaultRollingPolicy和OnCheckpointRollingPolicy。这两者的区别是前者是可以配置多个触发生成新文件的条件,比如文件大小达到xxx,比如时间间隔超过xxx。而后者比较简单,就是在每次checkpoint的时候写入新的文件,而checkpoint是按照时间间隔(每多长时间触发一次)触发的,所以后者就是在每xxx时间生成一个新文件。另外还有个比较坑的地方,我把官方文档的截图放下面了。如果你的hadoop版本小于2.7 就尽量使用OnCheckpointRollingPolicy。否则你的任务就会过一段时间报个错误,说是找不到 truncate方法。
    image.png

    2.如上所述checkpoint的时间间隔。不仅仅会影响checkpoint的频率,而且会影响hdfs文件的大小,而hdfs文件的大小可能会对hdfs的性能有很大影响。这个值如果太大,就会造成数据延迟太高,如果太小就会造成小文件过多。我这边设置的是5分钟。
    细心的看官,这个时候会问了,既然你的目录是分table的,那么每个table每5分钟的binlog数据量是不一样的。对于某些大的mysql表,我们可能每5分钟生成一个文件还能接受。对于一些比较小的表,每五分钟生成一个文件那么文件就会非常小。所以我这边又做了一层的筛选,我把mysql的大的表筛选出来,只同步大的表到hdfs,用以binlog的数据同步。因为本身binlog的方式同步mysql数据为的就是节约mysql的读取压力,而小的表对于不会有太大压力,这些表可以直接通过jdbc的方式去同步。

    spark批处理合并binlog增量数据和历史数据

    这个是整个环节里面最复杂的一部分,涉及的细节也比较多。
    首先,我们要明确一下总体的思路是什么。总体的思路就是要读取hdfs上的老的历史数据,然后和新的binlog数据合并生成新的快照。

    1. 读取老数据
      如果是首次导入数据,那么没有老数据,我们就需要通过jdbc从mysql全量同步一次。
      如果已经有老数据了。那么我们需要从该表读取老数据,然后与新的binlog的数据合并。
    2. 数据合并
      注意这里binlog的数据需要做一些处理。因为之前说过,binlog的数据是一个特殊的json格式。我们需要按照hive表的字段结构,将binlog的json结构转化成我们想要的固定的hive结构的dataframe。然后将这个dataframe和hive表的老数据union all之后,按照partition by id order by time 取最后一条就行了。
    3. 数据验证
      因为binlog的数据会自动清理,也有可能出现不稳定的情况,可能会导致后续数据一直是错的。所以我们在生成了hive的最新的快照之后,会和mysql的数据做一次比对。数据量差异超过1/1000那么就会全量通过jdbc的方式从mysql抽取一次数据。这一步非常关键,这个是对数据准确性的一个极大的保障。

    其实这中间还涉及到一些其他的细节,比如mysql表结构变更,或者mysql和hive的数据结构不一致的情况。
    另外我们这边还存在多个db的相同的表导入到hive的一张表中的其他问题,我就不赘述了。

    相关文章

      网友评论

          本文标题:mysql同步数据到hive---binlog方式

          本文链接:https://www.haomeiwen.com/subject/cueyjctx.html