美文网首页
Go操作阿里开源的框架Canal用于MySQL实时binlog同

Go操作阿里开源的框架Canal用于MySQL实时binlog同

作者: 楍跑的鸟 | 来源:发表于2022-09-09 14:24 被阅读0次

    第一章 Canal 服务端搭建

    1.1 认识 Canal 框架

    canal 官方开源github地址: canal ;下载地址:
    canal 官方文档github地址:canal document
    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

    工作原理:

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    1.2 搭建 Canal 服务框架

    1.2.1 启动 MySQL 的 Binlog 功能

    对于系统 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,修改MySQL配置文件my.cnf 或者 my.ini 中的配置,并重启 MySQL 服务;

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    1.2.2 启动 MySQL 的 Binlog 功能

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

    1.2.3 下载配置 canal 服务

    下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.6 版本为例

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
    

    或者直接选择文件下载


    image.png

    解压文件到指定文件夹 canal 下;
    配置修改

    配置文件路径:
    canal\conf\example\instance.properties
    
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*
    

    1.3 启动 Canal 框架

    到 canal 文件下查找 bin 文件夹,startup.bat 就是启动文件。双击 startup.bat 文件即可启动 canal 框架;


    image.png

    如果启动界面无法启动,或者出现一闪而过,说明 canal 框架启动失败;可以在文件 startup.bat 所在文件夹下,启动 powershell,使用命令式执行 startup.bat 文件,可查看运行具体信息。如下所示:


    image.png

    1.4 问题解决

    1.4.1 启动报错 Unrecognized VM option 'PermSize=128m'

    # 错误描述信息
    Unrecognized VM option 'PermSize=128m'
    Error: Could not create the Java Virtual Machine.
    Error: A fatal exception has occurred. Program will exit.
    
    image.png

    搜索查找资料大多数都是说JDK问题,需要安装JDK1.8,配置JAVA_HOME路径到JDK1.8就能解决;此方法,我没有做测试,无法得知是否是可以解决;
    仔细查看报错信息,是JAVA启动虚拟机失败,无法识别 VM 参数,因此是启动参数有问题。
    解决方法:此问题是canal启动参数有问题,修改启动参数即可解决。

    打开canal启动文件:canal\bin\startup.bat
    set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m 
    修改为
    set JAVA_MEM_OPTS= -Xms128m -Xmx512m 
    

    启动canal查看运行情况:


    image.png

    1.4.2 客户端连接canal报错 Debugger failed to attach: timeout during handshake

    [destination = example , address = /127.0.0.1:3306 , EventParser] 
    ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /127.0.0.1:3306 has an error, retrying.
    caused by com.alibaba.otter.canal.parse.exception.CanalParseException: 
    java.io.IOException: connect /127.0.0.1:3306 failure
    
    

    查找多数都是是Java远程调试参数配置问题等等;后来查看canal日志文件,找到问题根本,是canal连接MySQL数据失败导致;
    解决方法:修改canal用户权限;

    查看user数据库中canal用户信息,重新配置用户权限,并更新数据;
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
    

    还有一种情况是用户密码方式有问题;myql8.0版本的密码加密方式为caching_sha2_password,所以修改为mysql_native_password 就行并重新更新密码。这也是我认为有很大可能的另一种情况;值得注意的是,我只测试了修改了用户权限,就解决问题,修改用户密码方式,没有测试,无法知晓是否解决

    方法:
    ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; #更新一下用户密码
    
    FLUSH PRIVILEGES; #刷新权限
    

    第二章 Canal 客户端搭建——Go语言库 canal-go

    canal-go 库开源地址为:canal-go
    你可以直接下载 canal-go 库,使用项目中已经有的例子 canal-go/samples/main.go
    也可创建新的项目,导入 canal-go 库,以下就是第二个方式。步骤如下:

    1、创建项目 dbcanal

    2、go mod init

    3、创建 main.go 脚本

    4、将以下代码复制进去(代码在文章结尾。)

    5、go mod tidy 下载依赖

    6、go run main.go 执行程序查看结果

    image.png

    7、更改数据库中的数据,再查看结果

    MySQL 语句自己按照自己的数据库进行修改,增删改语句都可以进行监听查看。
    INSERT INTO test.meter_base_protocol 
    (name,age)
    VALUES 
    ("lilei",18);
    
    image.png image.png
    到此,所有关于 Go 操作阿里开源的框架 Canal 搭建完成,后续就是按照项目所需要求进行代码开发了, 祝大家好运。
    注意:代码
    package main
    
    import (
        "fmt"
        "github.com/golang/protobuf/proto"
        "github.com/withlin/canal-go/client"
        pbe "github.com/withlin/canal-go/protocol/entry"
        "log"
        "os"
        "time"
    )
    
    func main() {
    
        // 192.168.199.17 替换成你的canal server的地址
        // example 替换成-e canal.destinations=example 你自己定义的名字
        //  该字段名字在 canal\conf\example\meta.dat 文件中,NewSimpleCanalConnector函数参数配置,也在文件中
        /**
          NewSimpleCanalConnector 参数说明
            client.NewSimpleCanalConnector("Canal服务端地址", "Canal服务端端口", "Canal服务端用户名", "Canal服务端密码", "Canal服务端destination", 60000, 60*60*1000)
            Canal服务端地址:canal服务搭建地址IP
            Canal服务端端口:canal\conf\canal.properties文件中
            Canal服务端用户名、密码:canal\conf\example\instance.properties 文件中
            Canal服务端destination :canal\conf\example\meta.dat 文件中
        */
        connector := client.NewSimpleCanalConnector("127.0.0.1", 11111,
            "canal", "canal", "example",
            60000, 60*60*1000)
        err := connector.Connect()
        if err != nil {
            log.Println(err)
            os.Exit(1)
        }
    
        // https://github.com/alibaba/canal/wiki/AdminGuide
        //mysql 数据解析关注的表,Perl正则表达式.
        //
        //多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
        //
        //常见例子:
        //
        //  1.  所有表:.*   or  .*\\..*
        //  2.  canal schema下所有表: canal\\..*
        //  3.  canal下的以canal打头的表:canal\\.canal.*
        //  4.  canal schema下的一张表:canal\\.test1
        //  5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
    
        err = connector.Subscribe(".*\\..*")
        if err != nil {
            log.Println(err)
            os.Exit(1)
        }
    
        for {
    
            message, err := connector.Get(100, nil, nil)
            if err != nil {
                log.Println(err)
                os.Exit(1)
            }
            batchId := message.Id
            if batchId == -1 || len(message.Entries) <= 0 {
                time.Sleep(300 * time.Millisecond)
                fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "===没有数据了===")
                continue
            }
    
            printEntry(message.Entries)
    
        }
    }
    
    func printEntry(entrys []pbe.Entry) {
    
        for _, entry := range entrys {
            if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
                continue
            }
            rowChange := new(pbe.RowChange)
    
            err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
            checkError(err)
            if rowChange != nil {
                eventType := rowChange.GetEventType()
                header := entry.GetHeader()
                fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))
    
                for _, rowData := range rowChange.GetRowDatas() {
                    if eventType == pbe.EventType_DELETE {
                        printColumn(rowData.GetBeforeColumns())
                    } else if eventType == pbe.EventType_INSERT {
                        printColumn(rowData.GetAfterColumns())
                    } else {
                        fmt.Println("-------> before")
                        printColumn(rowData.GetBeforeColumns())
                        fmt.Println("-------> after")
                        printColumn(rowData.GetAfterColumns())
                    }
                }
            }
        }
    }
    
    func printColumn(columns []*pbe.Column) {
        for _, col := range columns {
            fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
        }
    }
    
    func checkError(err error) {
        if err != nil {
            fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
            os.Exit(1)
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Go操作阿里开源的框架Canal用于MySQL实时binlog同

          本文链接:https://www.haomeiwen.com/subject/piianrtx.html