概述
mysql同步数据到hive大部分公司目前都是走的jdbc的方式。
这种方式有两个好处:
- 开发简单。只需要从mysql读取相关的数据,插入到hive表当中就行了。Sqoop 或者 waterdrop 都是走的这个逻辑。
- 稳定。因为是直接读取mysql的jdbc的数据,基本不会有数据不一致的问题。
也有不好的地方:
- 对数据库的压力比较大。特别是每小时全量同步一些大的表的任务,都走jdbc的话,mysql的压力会比较大。
binlog同步mysql数据技术方案
总体架构设计
- gorail将mysql的binlog数据同步到kafka
- flink消费kafka的数据写入到hdfs
- spark批处理合并binlog增量数据和历史数据,生成最新的数据快照(这一步比较复杂,后面详细说明)
具体细节
gorail将mysql的binlog数据同步到kafka
这一步最主要的细节是将mysql库的所有binlog数据全部打入一个kafka topic,格式使用json。格式如下:
{
"action":"insert",//update、insert、delete
"gorail_nano":"1578131519599439478",//gorail处理时间
"primary_keys":"[[610927]]",//主键
"raw_rows":"[]",//更新前数据
"rows":"[map[action:1 content_id:1600040219 create_time:2020-01-04 17:51:59 id:610927 op_id:472 op_name:张玲玲]]",//更新后数据
"schema":"qukan",//库名
"table":"content_op_log",//表名
"timestamp":"1578131519"//binlog产生时间
}
- 为什么选择json?
因为json结构比较灵活,每个mysql的表的数据结构都不一样,json会更好记录这些变化的数据。 - 为什么打入到一个kafka topic?
这个主要是因为方便,后面消费的时候也只需要起一个flink任务,便于统一管理。
flink消费kafka的数据写入到hdfs
这一步的主要的细节在于写入到hdfs的结构,以及为什么不直接写入hive。
不写入到hive表的原因在于,binlog的数据结构是不固定的,而hive的结构相对是比较固定的。如果要写入到hive的话,就需要将不同的表的binlog写入到不同的hive表中,这个维护成本太高了。而且spark其实可以直接读取hdfs的json文件,因此直接放hdfs就好了。
写入到hdfs的话,考虑到后续读这个数据是要按照表去读增量数据,所以写入的目录一定是要带日期和表名称的。我这边用的目录结构是这样的:
${binlog_file_path}/schema=${mysql_db_name}/table=${mysql_table_name}/dt=${date}/${json_file}
也就是说要在flink根据数据所属的db、table_name、和日期将数据写入到不同的目录里。
在这一步的处理的过程中遇到了一些比较重要的参数问题。
- 首先是选择Rolling Policy。flink提供了两种rolling policy,分别是DefaultRollingPolicy和OnCheckpointRollingPolicy。这两者的区别是前者是可以配置多个触发生成新文件的条件,比如文件大小达到xxx,比如时间间隔超过xxx。而后者比较简单,就是在每次checkpoint的时候写入新的文件,而checkpoint是按照时间间隔(每多长时间触发一次)触发的,所以后者就是在每xxx时间生成一个新文件。另外还有个比较坑的地方,我把官方文档的截图放下面了。如果你的hadoop版本小于2.7 就尽量使用OnCheckpointRollingPolicy。否则你的任务就会过一段时间报个错误,说是找不到 truncate方法。
2.如上所述checkpoint的时间间隔。不仅仅会影响checkpoint的频率,而且会影响hdfs文件的大小,而hdfs文件的大小可能会对hdfs的性能有很大影响。这个值如果太大,就会造成数据延迟太高,如果太小就会造成小文件过多。我这边设置的是5分钟。
细心的看官,这个时候会问了,既然你的目录是分table的,那么每个table每5分钟的binlog数据量是不一样的。对于某些大的mysql表,我们可能每5分钟生成一个文件还能接受。对于一些比较小的表,每五分钟生成一个文件那么文件就会非常小。所以我这边又做了一层的筛选,我把mysql的大的表筛选出来,只同步大的表到hdfs,用以binlog的数据同步。因为本身binlog的方式同步mysql数据为的就是节约mysql的读取压力,而小的表对于不会有太大压力,这些表可以直接通过jdbc的方式去同步。
spark批处理合并binlog增量数据和历史数据
这个是整个环节里面最复杂的一部分,涉及的细节也比较多。
首先,我们要明确一下总体的思路是什么。总体的思路就是要读取hdfs上的老的历史数据,然后和新的binlog数据合并生成新的快照。
- 读取老数据
如果是首次导入数据,那么没有老数据,我们就需要通过jdbc从mysql全量同步一次。
如果已经有老数据了。那么我们需要从该表读取老数据,然后与新的binlog的数据合并。 - 数据合并
注意这里binlog的数据需要做一些处理。因为之前说过,binlog的数据是一个特殊的json格式。我们需要按照hive表的字段结构,将binlog的json结构转化成我们想要的固定的hive结构的dataframe。然后将这个dataframe和hive表的老数据union all之后,按照partition by id order by time 取最后一条就行了。 - 数据验证
因为binlog的数据会自动清理,也有可能出现不稳定的情况,可能会导致后续数据一直是错的。所以我们在生成了hive的最新的快照之后,会和mysql的数据做一次比对。数据量差异超过1/1000那么就会全量通过jdbc的方式从mysql抽取一次数据。这一步非常关键,这个是对数据准确性的一个极大的保障。
其实这中间还涉及到一些其他的细节,比如mysql表结构变更,或者mysql和hive的数据结构不一致的情况。
另外我们这边还存在多个db的相同的表导入到hive的一张表中的其他问题,我就不赘述了。
网友评论