美文网首页
参考MySQL Internals手册,使用Golang写一个简

参考MySQL Internals手册,使用Golang写一个简

作者: GreatSQL | 来源:发表于2022-01-12 13:36 被阅读0次
    • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。

    MySQL作为最流行的开源关系型数据库,有大量的拥趸。其生态已经相当完善,各项特性在圈内都有大量研究。每次新特性发布,都会有业界大咖对其进行全面审视、解读、研究,本文要讲的MySQL binlog解析也有很多的前辈开发过优秀的工具进行解析过(例如canal),本文再提旧案未免有造轮子嫌疑。

    但是我作为菜鸟,通过MySQL Internals手册来研究一下MySQL的binlog的协议、event类型、存储格式,并通过MySQL Internals手册的描述来窥探MySQL的数据存储格式,并且学习Golang语言特性,应该还有一定的学习意义。

    所以不揣冒昧,对解析binlog的过程编辑了以下,来梳理我对binlog一知半解,希望不会贻笑大方之家。

    涉及到的工具版本信息:

    IDE:    GoLand 2021.1.1 试用版
    Golang: go1.12.10
    DB:     mysql 8.0.23
    

    MySQL Internals手册:

    https://dev.mysql.com/doc/internals/en/

    Talk is cheap,show the code.

    一、MySQL binlog

    binlog是对数据库执行的所有DDL、DML语句以事件(event)形式记录的的日志,事务安全,目的是用来进行复制和恢复使用,是MySQL重要的特性之一。

    在解析binlog之前数据库需要开启binlog。

    配置如下参数:

    [mysqld]
    log-bin=mysql-bin
    server-id=1
    binlog_format=ROW
    

    注:binlog_format格式还有statement和mixed格式,篇幅所限,这里只讨论RBR情况。

    解析binlog,MySQL提供了mysqlbinlog工具来解析binlog文件。但这里我是通过程序模拟一个从库(slave)角色来解析流式的binlog,所以需要创建账号,用来连接主库和请求binlog。其中:

    • 1、获取主库binlog,必须有REPLICATION SLAVE权限。

    • 2、获取binlog filename和position必须有REPLICATION CLIENT或SUPER权限

    建议的授权:

    CREATE USER mysqlsync@'%' IDENTIFIED BY 'mysqlsync';
    GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'mysqlsync'@'%';
    

    二、解析流程

    解析方式时模拟从库(slave),MySQL的从库在通过ip、port、user、password连接主库后,向主库发送:

    binlogPosition
    binlogFlag
    serverId
    

    然后获取主库binlog。

    程序实现解析binlog首先是连接主库注册slave身份。然后向主库发送COM_BINLOG_DUMP或COM_BINLOG_DUMP_GTID请求binlog。

    COM_BINLOG_DUMP与COM_BINLOG_DUMP_GTID的区别在于COM_BINLOG_DUMP_GTID如果发送的通信包内不包含binlog-filename,主库则从已知的第一个binlog发送binlog-stream。

    手册 14.9.6 介绍:

    If the binlog-filename is empty, the server will send the binlog-stream of the first known binlog.

    流程如下:

    • 1、连接Master数据库
      输入数据库IP:PORT,用户名密码连接到Master数据库。

    • 2、设置相关参数
      master_binlog_checksum:MySQL在5.6版本之后为binlog引入了checksum机制,从库需要与主库相关参数保持一致。
      server_id:
      解析程序相当于一个从库,需要向主库发送set注册
      @master_binlog_checksum= @@global.binlog_checksum.

    • 3、注册从节点
      告知客户端的host、port、用户名与密码、serverId 发送COM_BINLOG_DUMP向Master请求binlog stream.

    • 4、接收binlog stream
      接收binlog后,根据event type解析获取数据输出。

    三、解析binlog

    3.1 连接主库

    程序使用tcp协议来连接主库,所以需要构建mysql协议的通信包来与主库进行三次握手。在通信包构建以及连接、发送和读取通信包,调用了kingshard的源码,见:

    https://github.com/flike/kingshard/

    其中,kingshard/mysql包含MySQL的client/server协议的实现,kingshard/backend包含了基于tcp连接的三次握手获取与MySQL的连接以及发送命令,获取并解析结果集等。

    协议内容在手册中可参看:

    https://dev.mysql.com/doc/internals/en/client-server-protocol.html

    程序中调用前先引入:

    import (
        "github.com/flike/kingshard/mysql"
        "github.com/flike/kingshard/backend"
    )
    

    声明一个普通连接函数,返回参数为:连接结构体(struct)以及三个字符串,对应连接,地址,账号,密码。

    func newConn(addr string, user string, pass string) *backend.Conn {
     c := new(backend.Conn)
     if err := c.Connect(addr, user, pass, ""); err != nil {
      fmt.Println("Connect failed ____________________")
     } else {
      fmt.Println("Connect success ____________________")
     }
     return c
    }
    

    有了上面的连接函数,就可以在main函数中调用newConn函数来获取一个连接,地址,账户,密码。

    addr := "127.0.0.1:3306"
    user := "mysqlsync"
    pass := "mysqlsync"
    
    c := newConn(addr, user, pass)
    

    3.2 设置checksum

    MySQL在5.6之后版本为binlog引入了checksum机制,例如crc32,我们的程序作为mysql slave,需要与服务端相关参数保持一致,需要执行:set @master_binlog_checksum= @@global.binlog_checksum 。

    使用获取的连接执行设置checksum:

    _, err := c.Execute("set @master_binlog_checksum= @@global.binlog_checksum")
    if err != nil {
      fmt.Println(err)
    }
    

    3.3 获取binlog当前的binlog_filename,binlog_pos

    向master发送show master status语句查询当前的binlog_filename,binlog_pos。调用的ShowMasterStatus是查询主库当前的binlog_fileneme和binlog_pos。

    show master status结果集的第一行第一列为inlog_file,第一行第二列为binlog_pos,返回结果如下:

    mysql> show master status;
    +---------------+----------+--------------+------------------+--------------------------------------------+
    | File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                          |
    +---------------+----------+--------------+------------------+--------------------------------------------+
    | binlog.000007 |      192 |              |                  | bd3f8dcf-c9d2-11eb-9a2d-080027dcd936:1-278 |
    +---------------+----------+--------------+------------------+--------------------------------------------+
    1 row in set (0.00 sec)
    

    在main函数中添加调用,来接收获取到的binlog_filename,binlog_pos。代码如下:

    binlogFileV, binlogPosV, err := c.ShowMasterStatus("show master status")
    if err != nil {
      panic(err)
    }
    

    其中ShowMasterStatus函数代码为获取结果集的第一行的前两列并返回:

    func (c *Conn) ShowMasterStatus(com string) (string, uint64, error) {
     var res *mysql.Result
     res, err := c.exec(com)
     if err != nil {
      panic(err)
     }
    
     var n = len(res.Resultset.Values)
     if n == 1 {
      fileStr, _ := res.Resultset.GetValue(0, 0)
      posStr, _ := res.Resultset.GetValue(0, 1)
      fileStrV, okf := fileStr.(string)
    
      if !okf {
       panic(err)
      }
    
      posStrV, okp := posStr.(uint64)
      if !okp {
       panic(err)
      }
      return fileStrV, posStrV, nil
     }
     return "", 0, fmt.Errorf("invalid resultset")
    }
    

    3.4 封装COM_BINLOG_DUMP通信packet

    获得了Master的binlog_filename和binlog_pos,通过发送COM_BINLOG_DUMP到Matser请求binlog。在internal手册中,COM_BINLOG_DUMP解释:

    https://dev.mysql.com/doc/internals/en/com-binlog-dump.html

    1              [12] COM_BINLOG_DUMP
    4              binlog-pos
    2              flags
    4              server-id
    string[EOF]    binlog-filename
    

    此时,需要我们自己来封装packet,在组成发送到Master通信的packet中,第1个字节位置存储COM_BINLOG_DUMP标识,第2到第5字节存储binlog-pos。

    第6到第7字节存储slave的server-id,剩余变长字节用来存储binlog-filename。

    在main函数中,创建字节数组,用获取到的binlog-filename、binlog-pos,与COM_BINLOG_DUMP、server-id组成请求binlog通信packet向Master请求binlog。

    在main函数中增加如下代码:

    binlogFile := []byte(binlogFileV)
    var binlogFlag uint16 = 0
    var serverId uint32 = 698148981
    
    length := len(binlogFile)
    data := make([]byte, 1+4+2+4+length)
    
    data[0] = mysql.COM_BINLOG_DUMP
    
    data[1] = byte(binlogPosV & 0xFFFFFFFF) // binlogPosV & 0xFFFFFFFF  uint64 --> uint32
    data[2] = byte((binlogPosV & 0xFFFFFFFF) >> 8)
    data[3] = byte((binlogPosV & 0xFFFFFFFF) >> 16)
    data[4] = byte((binlogPosV & 0xFFFFFFFF) >> 24)
    
    data[5] = byte(binlogFlag)
    data[6] = byte(binlogFlag >> 8)
    
    data[7] = byte(serverId)
    data[8] = byte(serverId >> 8)
    data[9] = byte(serverId >> 16)
    data[10] = byte(serverId >> 24)
    
    copy(data[1+4+2+4:], binlogFile)
    

    至此,请求binlog的packet组装完成,将其发送给Master,Master就会持续不断向本程序通信使用的IP和端口发送binlog network stream,除非遇到命令停止或者网络问题中断。

    在main函数中增加调用BinlogDump函数请求binlog stream的代码:

    c.BinlogDump(data, addr, user, pass)
    

    下面就是main函数调用的请求binlog的函数(此为部分,后续该函数中还将增加解析binlog代码):

    func (c *Conn) BinlogDump(arg []byte, addr string, user string, pass string) ([]byte, error) {
     if err := c.writeCommandBuf(arg[0], arg[1:]); err != nil {
      fmt.Println("WriteCommandBuf failed")
     } else {
      fmt.Println("WriteCommandBuf success")
     }
    }
    

    其中writeCommandBuf函数是根据tcp通信协议封装packet:

    func (c *Conn) writeCommandBuf(command byte, arg []byte) error {
     c.pkg.Sequence = 0
     length := len(arg) + 1
     data := make([]byte, length+4)
     data[4] = command
     copy(data[5:], arg)
     return c.writePacket(data)
    }
    

    3.5 解析event

    MySQL Binlog的event packet分为event header和event data两部分。在不同的Binlog Version中event header长度不同。

    在MySQL 5.0.0+版本中,使用的是Version 4,其event header占用字节长度是19。event data则根据event类型不同,占用字节长度也不尽相同,后文会对部分event进行解析。

    binlog header的组成在internal手册中解释:

    https://dev.mysql.com/doc/internals/en/binlog-event-header.html

    4              timestamp
    1              event type
    4              server-id
    4              event-size
       if binlog-version > 1:
    4              log pos
    2              flags
    

    在binlog的event header之前,使用一个字节位来存储packet标识,包含OK_Packet,EOF_Packet。EOF_Packet用来标识一个查询操作的结束,在MySQL 5.7.5以后的版本已经过期。手册中对此解释:

    These rules distinguish whether the packet represents OK or EOF:

    OK: header = 0 and length of packet > 7

    EOF: header = 0xfe and length of packet < 9

    详细请参考:https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html

    根据手册介绍,我们对event stream进行接收、解析。对上文中的BinlogDump函数进行扩展,golang没有提供while循环,这里使用for循环持续接收binlog event。

    增加的解析event header的代码:

    func (c *Conn) BinlogDump(arg []byte, addr string, user string, pass string) ([]byte, error) {
     if err := c.writeCommandBuf(arg[0], arg[1:]); err != nil {
      fmt.Println("WriteCommandBuf failed")
     } else {
      fmt.Println("WriteCommandBuf success")
     }
    
     for {
      data, err := c.readPacket()
      if err != nil {
       return nil, err
      }
    
      if data[0] == mysql.EOF_HEADER && len(data) < 9 {
       fmt.Println(" ReadEOF Success" + " Length: " + strconv.Itoa(len(data)))
       return nil, nil
      }
    
      if data[0] == mysql.OK_HEADER {
       if _, err := c.handleOKPacket(data); err != nil {
        fmt.Println(" ReadOk failed" + " length: " + strconv.Itoa(len(data)))
        return nil, err
       } else {
        timeStamp := data[1:5]
        eventType := data[5]
        serverId := data[6:10]
        eventSize := data[10:14]
        logPos := data[14:18]
        flags := data[18:20]
       }
      }
     }
    }
    

    这样,就完成了对event header的解析,这里获得的重要信息就是event type,这将决定后续对event的解析方式。

    根据eventType,就可以event data进行解析了。

    MySQL binlog中,共有三十几种event type,在MySQL 8.0.23中,在开启GTID时,执行一个事务,在binlog中会写入以下几种event:

    • GTID_LOG_EVENT:包含last_committed,sequence_number等组提交信息的event。

    • QUERY_EVENT:在binlog_format=statement时,执行的语句即存储在QUERY_EVENT中,例如BEGIN、START TRANSACTION等。

    • ROWS_QUERY_LOG_EVENT:记录原始SQL语句,并包含语句对应的行变更详细内容

    • TABLE_MAP_EVENT:包含事务涉及的表的ID和内部结构定义信息

    • WRITE_ROWS_EVENT_V2:MySQL 5.6.x以后版本的ROWS_EVENT

    • UPDATE_ROWS_EVENT_V2:MySQL 5.6.x以后版本的ROWS_EVENT

    • DELETE_ROWS_EVENT_V2:MySQL 5.6.x以后版本的ROWS_EVENT

    • XID_EVENT:用来标识xa事务的,在两阶段提交中,当执行commit时,会在binlog中记录XID_EVENT

    3.5.1 筛选event(对事务涉及的event进行解析)

    在本文中,着重对:

    • QUERY_EVENT
    • TABLE_MAP_EVENT
    • WRITE_ROWS_EVENT_V2
    • UPDATE_ROWS_EVENT_V2
    • DELETE_ROWS_EVENT_V2

    几个类型的event进行解析,也是事务涉及到的几种event type。

    在BinlogDump函数中的OK_HEADER判断中增加golang的switch代码,来判断过滤event type,便于后面的解析。switch示例如下:

    switch eventType {
      case mysql.XID_EVENT:
        fmt.Printf("EVENT TYPE: %v\n", "XID_EVENT")
      default:
        fmt.Printf("EVENT TYPE: %v\n", "This event will ignored!")
    }
    

    在每一个包含event的packet中,第1个字节为OK_Packet标识(值为0),第2-19字节为event header,而第20以后的字节则为event data部分。

    在程序中增加两个全局变量schema,table:

    var schema string // global para
    var table string  // global para
    

    3.5.2 解析QUERY_EVENT

    首先对 QUERY_EVENT 进行解析。在 QUERY_EVENT 中存储DDL及begin等语句,部分信息略过没有解析。代码解析如下:
    
    case mysql.QUERY_EVENT:
      fmt.Printf("EVENT TYPE: %v\n", "QUERY_EVENT")
      pos := 20
      // threadId := data[20:24]   // 4 bytes.
      // createTime := data[24:28] // 4 bytes.
      pos += 8
      schemaLen := data[pos : pos+1] // 1 byte.
      // errorCode := data[29:31]  // 2 bytes.
      pos += 3
      staVarLen := data[pos : pos+2] // 2 bytes (not present in v1, v3).
      pos += 2
      //statusVar := data[33 : 33+Byte2Int(staVarLen)]                                    
      database := data[pos+utils.Byte2Int(staVarLen) : pos+utils.Byte2Int(staVarLen)+utils.Byte2Int(schemaLen)]
      query := data[pos+utils.Byte2Int(staVarLen)+utils.Byte2Int(schemaLen)+1:]                                 
      schema = string(database)
      sql = string(query)
    
      fmt.Printf("SCHEMA: %v\n", schema)
      fmt.Printf("SQL: %v\n", sql)
    

    解析QUERY_EVENT,主要获取涉及到的DDL等语句。

    3.5.3 解析TABLE_MAP_EVENT

    对TABLE_MAP_EVENT进行解析,获取操作的schema、table。

    MySQL在binlog中的 TABLE_MAP_EVENT 记录了操作涉及的schema和table名,但未记录表的column信息。

    在正常的主从同步中,这些表的column信息是可以从slave的数据字典中查询到的。但是程序模拟的slave则需要通过schema和table名查询Master来获取这些信息。

    当然为了避免每次解析都要查询Master,也可以对其进行缓存。为了减少程序复杂度,本文没有连接Master获取column信息。

    TABLE_MAP_EVENT的Payload信息在手册中描述:

    https://dev.mysql.com/doc/internals/en/table-map-event.html

    post-header:
        if post_header_len == 6 {
      4              table id
        } else {
      6              table id
        }
      2              flags
    
    payload:
      1              schema name length
      string         schema name
      1              [00]
      1              table name length
      string         table name
      1              [00]
      lenenc-int     column-count
      string.var_len [length=$column-count] column-def
      lenenc-str     column-meta-def
      n              NULL-bitmask, length: (column-count + 8) / 7
    

    根据描述我们解析第28个字节获取schema名称长度(schema name length),在然后根据长度解析出schema名称。

    根据手册再顺延顺序向后,用相同的方式解析出table名称,并赋值给全局变量。

    代码参考如下:

    case mysql.TABLE_MAP_EVENT:
      fmt.Printf("EVENT TYPE: %v\n", "TABLE_MAP_EVENT")
      pos := 20
      // tableId := data[20:26]
      pos += 6
      pos += 2
      schemaNameLen := data[pos : pos+1]
      pos += 1 // 1 byte  schema name length
      schema = string(data[pos : pos+utils.Byte2Int(schemaNameLen)])
      pos += utils.Byte2Int(schemaNameLen)
      pos += 1 // 0x00 skip 1 byte
      tableNameLen := data[pos : pos+1]
      pos += 1 // 1 byte  table name length
      table = string(data[pos : pos+utils.Byte2Int(tableNameLen)])
      pos += utils.Byte2Int(tableNameLen) // table name
      pos += 1                            // 0x00 skip 1 byte
    
      columnCount := utils.Byte2Int(data[pos : pos+1])
      pos += 1 // column-count
    
      colType := data[pos : pos+columnCount]
    
      pos += columnCount
    
      // var n int
      var err error
      var metaData []byte
      if metaData, _, _, err = GetColumnMetaArray(data[pos:]); err != nil {
        return err
      }
    
      colMeta, err := GetMeta(metaData, columnCount, colType)
      if err != nil {
        return err
      }
    
      ColumnType = colType
      ColumnMeta = colMeta
      schema = string(schema)
      table = string(table)
    
      fmt.Printf("SCHEMA: %v\n", string(schema))
      fmt.Printf("TABLE: %v\n", string(table))
      fmt.Printf("columnCount: %v\n", columnCount)
      fmt.Println("-----------------")
      for i := 0; i < len(colType); i++ {
        fmt.Printf("ColumnType: %v\n", colType[i])
      }
      fmt.Println("-----------------")
      fmt.Println("&&&&&&&&&&&&&&&&&")
      for i := 0; i < len(colMeta); i++ {
        fmt.Printf("ColumnMeta: %v\n", colMeta[i])
      }
      fmt.Println("&&&&&&&&&&&&&&&&&")
      fmt.Printf("TABLE: %v\n", string(table))
        ```
        
        对TABLE_MAP_EVENT解析主要是获取事务涉及的schema和table信息。
    
    ### 3.5.4 解析ROWS_EVENT
    
    对 ROWS_EVENT 的三个event(
    
    WRITE_ROWS_EVENT_V2 /
    
    UPDATE_ROWS_EVENT_V2 /
    
    DELETE_ROWS_EVENT_V2)进行解析。
    
    手册对event描述如下:
    
    https://dev.mysql.com/doc/internals/en/rows-event.html
        
    

    header:
    if post_header_len == 6 {
    4 table id
    } else {
    6 table id
    }
    2 flags
    if version == 2 {
    2 extra-data-length
    string.var_len extra-data
    }

    body:
    lenenc_int number of columns
    string.var_len columns-present-bitmap1, length: (num of columns+7)/8
    if UPDATE_ROWS_EVENTv1 or v2 {
    string.var_len columns-present-bitmap2, length: (num of columns+7)/8
    }

    rows:
    string.var_len nul-bitmap, length (bits set in 'columns-present-bitmap1'+7)/8
    string.var_len value of each field as defined in table-map
    if UPDATE_ROWS_EVENTv1 or v2 {
    string.var_len nul-bitmap, length (bits set in 'columns-present-bitmap2'+7)/8
    string.var_len value of each field as defined in table-map
    }
    ... repeat rows until event-end
    ```

    ROWS_EVENT的三种event,其packet的格式是相同的。第20到第26字节存储的table id,后续4字节与本文解析日志内容关联不大,跳过。

    第30到31字节存储表中列的数量。其后存储的是SQL语句用到的列,是使用bitmap来存储的。

    在UPDATE_ROWS_EVENT_V2中,存储了更新前后的列的数据,所以使用两个bitmap。bitmap长度按照手册描述为:

    length: (num of columns+7)/8

    列的数量和列是否用到bitmap组成了event的body部分,后面就是event真正存储的数据部分event data了,如果是多行,则循环排列,直至event结束。

    注:这里代码中没有考虑列使用unsigned情况,在实际应用场景中这需要考虑。

    在main函数的对event type的switch判断中,增加如下case:

    case mysql.WRITE_ROWS_EVENT_V2, mysql.UPDATE_ROWS_EVENT_V2, mysql.DELETE_ROWS_EVENT_V2:
         switch eventType {
         case mysql.WRITE_ROWS_EVENT_V2:
          fmt.Printf("EVENT TYPE: %v\n", "WRITE_ROWS_EVENT_V2")
         case mysql.UPDATE_ROWS_EVENT_V2:
          fmt.Printf("EVENT TYPE: %v\n", "UPDATE_ROWS_EVENT_V2")
         case mysql.DELETE_ROWS_EVENT_V2:
          fmt.Printf("EVENT TYPE: %v\n", "DELETE_ROWS_EVENT_V2")
         }
         pos := 20
         // tableId := data[pos : pos+6] // 6 bytes
         pos += 6
         // flags := data[pos:pos+2]    // 2 byte
         pos += 2
         extraDataLen := data[pos : pos+2] // 2 bytes
         pos += utils.Byte2Int(extraDataLen)
    
         columnLen := data[pos : pos+1] // 1 byte
         fmt.Printf("columnLen: %v\n", columnLen)
         pos += 1
         colPreBitmap1 := data[pos : pos+(utils.Byte2Int(columnLen)+7)/8] // columns-present-bitmap1, length: (num of columns+7)/8
         pos += (utils.Byte2Int(columnLen) + 7) / 8
    
         var colPreBitmap2 []byte
         if eventType == mysql.UPDATE_ROWS_EVENT_V2 {
          colPreBitmap2 = data[pos : pos+(utils.Byte2Int(columnLen)+7)/8] // columns-present-bitmap2, length: (num of columns+7)/8
          pos += (utils.Byte2Int(columnLen) + 7) / 8
         }
    
         var rows1 [][]interface{}
         var rows2 [][]interface{}
         for pos < len(data) {
          // repeat rows until event-end
          rows1Re, n1, err := GetRows(rows1, data[pos:], utils.Byte2Int(columnLen), ColumnType, colPreBitmap1, ColumnMeta)
          if err != nil {
           return err
          }
          pos += n1
          for i := 0; i < len(rows1Re); i++ {
           fmt.Printf("ColumnMeta: %v\n", rows1Re[i])
          }
    
          if len(colPreBitmap2) > 0 {
           rows2Re, n2, err := GetRows(rows2, data[pos:], utils.Byte2Int(columnLen), ColumnType, colPreBitmap2, ColumnMeta)
           if err != nil {
            return err
           }
           pos += n2
           for i := 0; i < len(rows2Re); i++ {
            fmt.Printf("ColumnMeta: %v\n", rows2Re[i])
           }
          }
         }
    
    代码中,for pos < len(data)部分就是在解析event body后对真正存储的行数据进行循环解析。
    

    解析是调用GetRows()函数进行的,在GetRows方法中,对不同数据类型,因占用不同字节长度,以及部分类型使用了压缩算法,采用了不同的解码方式。

    解码这部分代码繁杂,枯燥,本文篇幅所限就不介绍了,有兴趣可参考手册以及其他开源工具,例如阿里开源的canal等

    这里对解析结果直接进行了控制台输出,在实际应用中可以格式化为json或其他格式输出,以利于阅读或者应用。

    3.6 测试程序

    在数据库中执行简单sql:

    mysql> use wl
    Database changed
    mysql> update name set first='202106171325' where id = 48;
    Query OK, 1 row affected (0.00 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    

    控制台输出:

    TABLE: name
    EVENT TYPE: UPDATE_ROWS_EVENT_V2
    columnLen: [3]
    ColumnMeta: [48 20210617 <nil>]
    ColumnMeta: [48 202106171325 <nil>]
    EVENT TYPE: XID_EVENT
    

    可以看到UPDATE_ROWS_EVENT_V2记录了更改前后的数据值,列值为空时,记录为<nil>

    至此,对binlog的简单解析就结束了。

    后记

    代码中,对字符串类型转换,通信packet的封装,字节解码分别参考了:

    https://github.com/siddontang/go/hack

    https://github.com/flike/kingshard

    https://github.com/alibaba/canal

    感谢开源社区!

    Enjoy GreatSQL :)

    本文由博客一文多发平台 OpenWrite 发布!

    相关文章

      网友评论

          本文标题:参考MySQL Internals手册,使用Golang写一个简

          本文链接:https://www.haomeiwen.com/subject/cljwcrtx.html