美文网首页
Maxwell同步mysql增量表数据到kafka,并消费到hd

Maxwell同步mysql增量表数据到kafka,并消费到hd

作者: Yobhel | 来源:发表于2023-11-05 10:26 被阅读0次

    1 数据通道

    image.png

    2 Maxwell配置
    默认情况下,Maxwell会同步binlog中的所有表的数据变更记录,按照规划,有cart_info、order_info等共计11张表需进行增量同步,按理我们应对Maxwell进行配置,令其只同步这特定的11张表,但为了与实时数仓架构保持一致,此处不做相应配置,而令 Maxwell 对 binlog 中所有表的数据变更记录进行同步,并将数据全部发往 topic_db 主题。
    Maxwell最终配置如下:
    1)修改Maxwell配置文件config.properties

    [yobhel@hadoop101 maxwell]$ vim /opt/module/maxwell/config.properties
    

    2)全部配置参数如下

    log_level=info
    
    producer=kafka
    kafka.bootstrap.servers=hadoop101:9092,hadoop101:9092,hadoop101:9092
    
    #kafka topic配置,业务数据发往的目标主题
    kafka_topic=topic_db
    # mysql login info
    host=hadoop101
    user=maxwell
    password=maxwell
    jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
    

    3)重新启动Maxwell

    [yobhel@hadoop101 bin]$ mxw.sh restart
    

    4)通道测试
    (1)启动Zookeeper以及Kafka集群
    (2)启动一个Kafka Console Consumer,消费 topic_db 主题的数据

    [yobhel@hadoop101 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic_db
    

    (3)生成模拟数据

    [yobhel@hadoop101 bin]$ cd /opt/module/data_mocker/
    [yobhel@hadoop101 data_mocker]$ java -jar edu2021-mock-2022-06-18.jar
    

    (4)观察Kafka消费者是否能消费到数据

    {"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37606,"commit":true,"data":{"id":23899,"user_id":16,"origin_amount":800.00,"coupon_reduce":0.00,"final_amount":800.00,"order_status":"1002","out_trade_no":"211814417714292","trade_body":"大数据技术之Zookeeper(2021最新版)等4件商品","session_id":"3a96bddb-7f94-4a0f-9a5b-1aa6fadd718c","province_id":30,"create_time":"2022-02-21 15:15:14","expire_time":"2022-02-21 15:30:14","update_time":"2022-02-21 15:15:42"},"old":{"order_status":"1001","update_time":null}}
    {"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37589,"commit":true,"data":{"id":23900,"user_id":473,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1003","out_trade_no":"462573352988853","trade_body":"尚硅谷大数据技术之Azkaban等1件商品","session_id":"d78dd675-5a38-4e33-b431-b1ef68a89089","province_id":29,"create_time":"2022-02-21 11:26:30","expire_time":"2022-02-21 11:41:30","update_time":"2022-02-21 11:41:47"},"old":{"order_status":"1001","update_time":null}}
    {"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37694,"commit":true,"data":{"id":23901,"user_id":70,"origin_amount":400.00,"coupon_reduce":0.00,"final_amount":400.00,"order_status":"1002","out_trade_no":"677577676596486","trade_body":"尚硅谷大数据技术之Shell等2件商品","session_id":"9b842bcc-3288-49da-8ec2-0e00d743b783","province_id":33,"create_time":"2022-02-21 19:45:13","expire_time":"2022-02-21 20:00:13","update_time":"2022-02-21 19:45:33"},"old":{"order_status":"1001","update_time":null}}
    

    3 Flume配置

    1)Flume配置概述
    Flume需要将Kafka中各topic的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channe选用FileChanne。
    需要注意的是,KafkaSource需订阅Kafka中的11个topic,HDFSSink需要将不同topic的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:


    image.png

    具体数据示例如下:


    image.png
    2)Flume配置实操
    (1)创建Flume配置文件
    在hadoop103节点的Flume的job目录下创建kafka_to_hdfs_db.conf
    [yobhel@hadoop103 job]$ vim kafka_to_hdfs_db.conf 
    

    (2)配置文件内容如下

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
    a1.sources.r1.kafka.topics = topic_db
    a1.sources.r1.kafka.consumer.group.id = flume
    a1.sources.r1.setTopicHeader = true
    a1.sources.r1.topicHeader = topic
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.TimestampAndTableNameInterceptor$Builder
    
    
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/data/flume/checkpoint/behavior2
    a1.channels.c1.dataDirs = /opt/data/flume/data/behavior2
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    ## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = db
    a1.sinks.k1.hdfs.round = false
    
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    

    (3)编写Flume拦截器
    代码:https://github.com/Yobhel121/edu-flume-interceptor

    将打好的包放入到hadoop103的/opt/module/flume/lib文件夹下
    3)通道测试
    (1)启动Zookeeper、Kafka集群
    (2)启动hadoop103的Flume

    [yobhel@hadoop103 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console
    

    (3)反注释mock.sh中关于Maxwell的内容, 执行脚本生成模拟数据

    #!/bin/bash
    DATA_HOME=/opt/module/data_mocker
    MAXWELL_HOME=/opt/module/maxwell
    
    function mock_data() {
      if [ $1 ]
      then
        sed -i "/mock.date/s/.*/mock.date: \"$1\"/" $DATA_HOME/application.yml
        echo "正在生成 $1 当日的数据"
      fi
      cd $DATA_HOME
          nohup java -jar "edu2021-mock-2022-03-14.jar" >/dev/null 2>&1  
    }
    
    case $1 in
    "init")
      [ $2 ] && do_date=$2 || do_date='2022-02-21'
      sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 1/" $DATA_HOME/application.yml
      sed -i "/mock.clear.user/s/.*/mock.clear.user: 1/" $DATA_HOME/application.yml
      mock_data $(date -d "$do_date -5 days" +%F)
      sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 0/" $DATA_HOME/application.yml
      sed -i "/mock.clear.user/s/.*/mock.clear.user: 0/" $DATA_HOME/application.yml
      for ((i=4;i>=0;i--));
      do
        mock_data $(date -d "$do_date -$i days" +%F)
      done
      ;;
    [0-9][0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9])
    
        sed -i "/mock_date/s/.*/mock_date=$1/" $MAXWELL_HOME/config.properties
        mxw.sh restart
        sleep 1  
        mock_data $1
        ;;
    esac
    

    执行脚本生成数据

    [yobhel@hadoop101 bin]$ mock.sh 2022-02-22
    

    (4)观察HDFS上的目标路径是否有数据出现
    若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。

    (5)数据目标路径的日期说明
    仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

    此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。

    mock.sh脚本在生成数据时会修改Maxwell的配置信息

    4)编写Flume启停脚本
    为方便使用,此处编写一个Flume的启停脚本
    (1)在hadoop101节点的/home/yobhel/bin目录下创建脚本f3.sh

    [yobhel@hadoop101 bin]$ vim f3.sh
    

    在脚本中填写如下内容

    #!/bin/bash
    
    case $1 in
    "start")
            echo " --------启动 hadoop103 业务数据flume-------"
            ssh hadoop103 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
    ;;
    "stop")
    
            echo " --------停止 hadoop103 业务数据flume-------"
            ssh hadoop103 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
    ;;
    esac
    

    (2)增加脚本执行权限

    [yobhel@hadoop101 bin]$ chmod +x f3.sh
    

    (3)f3启动

    [yobhel@hadoop101 module]$ f3.sh start
    

    (4)f3停止

    [yobhel@hadoop101 module]$ f3.sh stop
    

    4 增量表首日全量同步

    通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
    1)在~/bin目录创建mysql_to_kafka_inc_init.sh

    [yobhel@hadoop101 bin]$ vim mysql_to_kafka_inc_init.sh
    

    脚本内容如下

    #!/bin/bash
    
    # 该脚本的作用是初始化所有的增量表,只需执行一次
    
    MAXWELL_HOME=/opt/module/maxwell
    
    import_data() {
        $MAXWELL_HOME/bin/maxwell-bootstrap --database edu2077 --table $1 --config $MAXWELL_HOME/config.properties
    }
    
    case $1 in
    cart_info | comment_info | favor_info | order_detail | order_info | payment_info | review_info | test_exam | test_exam_question | user_info | vip_change_detail)
      import_data $1
      ;;
    "all")
      for tmp in cart_info comment_info favor_info order_detail order_info payment_info review_info test_exam test_exam_question user_info vip_change_detail
      do
        import_data $tmp
      done
      ;;
    esac
    

    2)为mysql_to_kafka_inc_init.sh all增加执行权限

    [yobhel@hadoop101 bin]$ chmod +x ~/bin/mysql_to_kafka_inc_init.sh
    

    3)测试同步脚本
    (1)清理历史数据
    为方便查看结果,现将HDFS上之前同步的增量表数据删除

    [yobhel@hadoop101 ~]$ hadoop fs -ls /origin_data/edu/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f
    

    (2)执行同步脚本

    [yobhel@hadoop101 bin]$ mysql_to_kafka_inc_init.sh all 
    

    4)检查同步结果
    观察HDFS上是否重新出现增量表数据。

    5 增量表同步总结

    增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步。首日进行全量同步时,需先启动数据通道,包括Maxwell、Kafka、Flume,然后执行增量表首日同步脚本mysql_to_kafka_inc_init.sh进行同步。后续每日只需保证采集通道正常运行即可,Maxwell便会实时将变动数据发往Kafka。

    相关文章

      网友评论

          本文标题:Maxwell同步mysql增量表数据到kafka,并消费到hd

          本文链接:https://www.haomeiwen.com/subject/prawidtx.html