美文网首页
MySQL Binlog 同步工具go-mysql-transf

MySQL Binlog 同步工具go-mysql-transf

作者: wangjie2016 | 来源:发表于2020-10-28 21:53 被阅读0次

    一、go-mysql-transfer

    go-mysql-transfer是一款MySQL实时、增量数据同步工具。能够实时解析MySQL二进制日志binlog,并生成指定格式的消息,同步到接收端。

    go-mysql-transfer具有如下特点:

    1、不依赖其它组件,一键部署

    2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用

    3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑

    4、支持监控告警,集成Prometheus客户端

    5、高可用集群部署

    6、数据同步失败重试

    7、全量数据初始化

    详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解

    项目开源地址:

    gitee (速度更快) :go-mysql-transfer
    github:go-mysql-transfer

    如果此工具对你有帮助,请Star支持下

    二、Lua脚本引擎

    go-mysql-transfer中使用gopher-lua作为Lua虚拟机,支持Lua5.1规范。Lua作为专业的内置脚本语言,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。开发者只需要花费少量时间就能大致掌握其用法。

    基于Lua的高扩展性,可以实现更为复杂的数据解析、消息生成、数据处理逻辑。

    三、json模块

    提供json数据格式的序列化和反序列化功能,提供encode和decode两个方法。
    使用示例如下:

    local json = require("json")   -- 加载json模块
    local ops = require("mqOps") --加载mq操作模块
    
    local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
    local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、delete
    
    local id = row["ID"] --获取ID列的值
    local userName = row["USER_NAME"] --获取USER_NAME列的值
    local password = row["PASSWORD"] --获取USER_NAME列的值
    local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    
    local result = {}  -- 定义一个table,作为结果
    result["id"] = id
    result["action"] = action
    
    if action == "delete" -- 删除事件
    then
        local val = json.encode(result) -- 将result转为json
        ops.SEND("transfer_test_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
    else 
        result["userName"] = userName
        result["password"] = password
        result["createTime"] = createTime
        result["source"] = "binlog" -- 数据来源
        local val = json.encode(result) -- 将result转为json
        ops.SEND("transfer_test_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
        -- local obj = json.decode(val ) -- json反序列化
        -- print(obj ["createTime"])
    

    四、db(数据库操作)模块

    比如我们有角色表(t_role):

    ID CODE NAME REMARK
    1 r1 管理员 具有所有操作权限
    2 r2 测试员 具有测试功能的操作权限

    用户表(t_user):

    ID USER_NAME PASSWORD ROLE_CODE CREATE_TIME
    1 admin 123456 r1 2020-10-20 22:00:10

    我们需要监听t_user表,并向接收端发送如下格式的消息:

    {
           
        "id": "1",
        "userName": "admin"
        "password": "123456",
        "createTime": 100001,
        "roleName": "系统管理员",
        "roleRemark": "管理后台相关信息",
        "source": "binlog",
        
    }
    

    基于Binlog的数据同步工具,只能监听到一行数据的变更,进行响应。无法像基于SQL的ETL工具那样具有多表连接的能力。如果要得到向上面那样的聚合数据,需要使用dbOps模块,用法如下:

    local json = require("json")   -- 加载json模块
    local ops = require("mqOps") --加载mq操作模块
    local db = require("dbOps") --加载数据库(db)操作模块
    
    local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
    -- print(json.encode(row))
    local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、delete
    
    local id = row["ID"] --获取ID列的值
    local userName = row["USER_NAME"] --获取USER_NAME列的值
    local password = row["PASSWORD"] --获取USER_NAME列的值
    local roleCode = row["ROLE_CODE"] --角色编码
    local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    
    local result = {}  -- 定义一个table,作为结果
    result["id"] = id
    result["action"] = action
    
    if action == "delete" -- 删除事件
    then
        local val = json.encode(result) -- 将result转为json
        ops.SEND("user_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
    else 
        
        local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL语句,不能直接使用表名,要使用(数据库名称.表名称),如:ESEAP.T_ROLE
        local roleRS = db.selectOne(sql) -- 执行SQL查询,返回一条查询结果,table类型,结构如:{"CODE":"a1","ID":"1","NAME":"系统管理员","REMARK":"管理后台相关信息"}
        -- print(json.encode(roleRS))
        local roleName = roleRS["NAME"] --角色名称
        local roleRemark = roleRS["REMARK"] --角色描述
        
        -- local roleListRS = db.select(sql) -- 执行SQL查询,返回多条条查询结果,数组类型,元素为table,结构如:[{"CODE":"a1","ID":"1","NAME":"系统管理员","REMARK":"管理后台相关信息"}]
        -- print(json.encode(roleListRS))
        
        result["userName"] = userName
        result["password"] = password
        result["createTime"] = createTime
        result["source"] = "binlog" -- 数据来源
        result["roleName"] = roleName
        result["roleRemark"] = roleRemark
        local val = json.encode(result) -- 将result转为json
        ops.SEND("user_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
    end 
    

    dbOps模块的方法说明:
    1、selectOne(sql) 查询一条数据,返回table类型的结果;如果查询不到数据,返回空table;如果查询到多个结果,会出错
    2、select(sql) 查询多条数据,返回数组类型的结果,数组元素为tablem(格式如:[table1,table2]);查询不到结果,返回空table;

    四、http客户端模块

    让go-mysql-transfer具体发送任意http请求的能力,httpOps提供的方法说明:

    1、get(url,headers) 发送get请求;url为请求地址;headers为请求头参数,table类型
    2、delete(url,headers) 发送delete请求;url为请求地址;headers为请求头参数,table类型
    3、post(url,headers,formItems) 发送post请求;url为请求地址;headers为请求头参数,table类型;formItems为表单数据,table类型
    4、put(url,headers,formItems) 发送put请求;url为请求地址;headers为请求头参数,table类型;formItems为表单数据,table类型

    上面4个方法的返回值为一个table类型的结果,元素"status_code"为http响应状态,Number类型(如:200、401、403、500等);元素body为http响应内容,string类型

    httpOps模块具体用法如下:

    local json = require("json")   -- 加载json模块
    local ops = require("redisOps") --加载redis操作模块
    local httpcli = require("httpOps") --加载http操作模块
    
    local row = ops.rawRow()  --数据库当前变更的一行数据,table类型,key为列名称
    local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、delete
    
    local _id = row["ID"] --获取ID列的值
    local _userName = row["USER_NAME"] --获取USER_NAME列的值
    local _password = row["PASSWORD"] --获取USER_NAME列的值
    local _createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    local key = "user_".._id -- 定义key
    
    if action == "insert" -- 插入事件
    then
        -- get
         local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) 
         local res = httpcli.get(url,{
            Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
         }) -- http get请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table
         local status = res.status_code
        --print(res.status_code)  -- http响应代码,如:200、401、403、500等
        --print(res.body)-- http响应内容,string类型
        --local resObj = json.decode(res.body) -- json反序列化响应内容
        --print(resObj["msg"])
        
        
        -- delete
        --local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) 
        --local res = httpcli.delete(url,{
        --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
        --}) -- http delete请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table
        
        -- post
        --local url = "http://localhost:9999/http_tests"
        --local res = httpcli.post(url,{
        --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
        --},{
        --  id=_id,
        --  userName=_userName,
        --  password=_password,
        --  createTime=_createTime
        --}) -- http post请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table;第三个参数为post内容,类型为table
        
        --put
        --local url = "http://localhost:9999/http_tests"
        --local res = httpcli.put(url,{
        --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
        --},{
        --  id=_id,
        --  userName=_userName,
        --  password=_password,
        --  createTime=_createTime
        --}) -- http put请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table;第三个参数为post内容,类型为table
        
        if status == 200
        then 
            ops.SADD("user_set",userName.."|succeed") -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为value
        else
            ops.SADD("user_set",userName.."|failed") -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为value
        end
        
        
    end 
    

    相关文章

      网友评论

          本文标题:MySQL Binlog 同步工具go-mysql-transf

          本文链接:https://www.haomeiwen.com/subject/pgncvktx.html