美文网首页
MySQL协议和canal实现

MySQL协议和canal实现

作者: 清幽之地 | 来源:发表于2020-02-28 22:36 被阅读0次

    前言

    前面的文章里,我们了解到 canal 可以从 MySQL 中感知数据的变化。这是因为它模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,从而实现了主从复制。

    正是了解到这一点,笔者有两个问题便一直萦绕于心:

    • 它是如何模拟 MySQL slave 交互协议的?
    • 它又是怎么解析 binlog 日志的呢?

    今天,笔者准备就着这两个问题,扒拉扒拉 canal 的代码,一探究竟。

    一、MySQL 主从复制

    在谈 canal 之前,我们有必要再重温下 MySQL 主从复制的原理。

    image

    总结上图的流程如下:

    • MySQL master 将数据变更写入二进制日志 (binary log , 其中记录叫做二进制日志事件binary log events);
    • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志 (relay log);
    • MySQL slave 重放 relay log 中的事件,将数据变更反映到自己的数据库。

    二、canal 原理

    image

    上图就很形象的描述了 canal 的角色。它的原理也很简单:

    • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;
    • mysql master收到dump请求,开始推送binary log给slave(也就是canal);
    • canal解析binary log对象(原始为byte流);
    • canal将解析后的对象,根据业务场景,分发到比如 MySQL 、RocketMQ 或者 ES 中。

    三、源码启动

    看完了 MySQL 主从复制和 canal 原理之后,为了方便 debug ,笔者已经在 GitHub Fork 了源码,并导入本地。

    可以找到 com.alibaba.otter.canal.deployer.CanalLauncher 类,它就是 canal 独立版本启动的入口类。

    在这里,直接运行 main 方法即可运行 canal ,和在 /canal/bin/startup.sh 中效果一样。

    事实上,canal 的代码比较多,在架构上又分了很多模块设计,比如事件解析器、事件消费、内存存储、服务实例、元数据、高可用等。

    本文不打算面面俱到介绍每一个的实现,那就得正儿八经写一个 canal 系列才行。主要还是为了开头我们提出的那两个问题。

    四、如何模拟slave ?

    上面我们已经说到,CanalLauncher是canal 启动的入口类。

    运行 main 方法之后, canal 会先做很多准备工作。比如加载配置文件、初始化消息队列、启动 canal Admin、加载Spring配置、注册钩子程序等。

    canal 模拟 slave 协议,是在EventParser模块中开始进行的。

    image

    在 canal 代码中,整个流程简化如下:

    // 开始执行replication
    // 1. 构造Erosa连接
    ErosaConnection erosaConnection = buildErosaConnection();
    // 2. 启动一个心跳线程
    startHeartBeat(erosaConnection);
    // 3. 执行dump前的准备工作
    preDump(erosaConnection);
    erosaConnection.connect();// 链接
    // 查询master serverId
    long queryServerId = erosaConnection.queryServerId();
    if (queryServerId != 0) {
        serverId = queryServerId;
    }
    // 4. 获取binlog最后的位置信息
    EntryPosition position = findStartPosition(erosaConnection);
    final EntryPosition startPosition = position;
    // 加载元数据
    processTableMeta(startPosition);
    // 重新链接,因为在找position过程中可能有状态,需要断开后重建
    erosaConnection.reconnect();
    // 4. 开始dump数据
    erosaConnection.dump(startPosition.getJournalName(),startPosition.getPosition(),sinkHandler);
    

    1、握手、验证

    在开始之前,canal 必须先要和 MySQL 服务器建立连接,并完成客户端身份验证。

    在 MySQL 中,连接过程协议如下:

    image

    在代码中,我们看一下它的连接方法:

    image

    其中,negotiate方法是握手协议和客户端验证的具体实现。就是按照 MySQL 的协议规范,通过上面创建的Socket channel来读写网络数据。

    2、dump前的准备

    正确连接到 MySQL 后,在开始执行 dump 指令之前,还要初始化一些配置信息。

    思路就是通过 MySQL 执行器,执行 SQL 语句,获取信息。

    代码就不粘了,不过它们执行的语句如下:

    show variables like 'binlog_format'      #获取binlog format格式
    show variables like 'binlog_row_image'   #获取binlog image格式
    show variables like 'server_id'          #获取matser serverId
    show master status                       #获取binlog名称和position
    

    3、注册slave

    现在开始调用 erosaConnection.dump(binlogfilename,binlogPosition,func)方法,来注册slave和发送dump命令。

    在使用COM_BINLOG_DUMP请求binlog事件之前发送,在主服务器上注册一个从服务器,它的指令是COM_REGISTER_SLAVE

    image

    注册完之后,就是发送dump请求,它的指令是COM_BINLOG_DUMP

    image

    在执行完这段代码后,我们通过show processlist;查看进程,就可以看到这个dump线程的状态。

    id user host db command time state
    139 canal localhost:62901 null Binlog Dump 3 Master has sent all binlog to slave; waiting for more updates

    五、如何解析binlog数据 ?

    在上面章节中,我们已经看到,MySQL主服务器已经接受了 canal 这个从服务器,那么当canal拿到binlog内容后,
    又是怎么解析它的呢?

    首先,还记得在配置MySQL服务器的时候,我们将binlog-format设置为ROW模式,它是基于行的复制。

    binlog中每一个数据变更可以叫做事件,在ROW模式下,有几个主要的事件类型:

    事件 SQL命令 rows 内容
    TABLE_MAP_EVENT null 定义将要更改的表。
    WRITE_ROWS_EVENT 插入 要插入的行数据
    DELETE_ROWS_EVENT 删除 被删除的数据
    UPDATE_ROWS_EVENT 更新 原数据+要更改的数据

    每一次数据的变更,都会触发2个事件,先把要更改的表信息告诉你,然后再告诉你更改的row内容。

    比如TABLE_MAP_EVENT + WRITE_ROWS_EVENT

    canal在接收到binlog数据后,并不会马上把它解析成我们熟悉的JSON数据,而是在发送的时候才开始。

    比如我们选择使用RocketMQ,那么在发送之前才开始将binlog里面的byte数组转化为对象。

    // 并发构造
    EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
    // 串行分区
    List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
    

    在这两个方法里,就完成了byte数组到对象的转化。转化成的FlatMessage对象,就成了我们在消息队列中消费到的数据结构。

    public class FlatMessage implements Serializable {
        private long                      id;
        private String                    database;
        private String                    table;
        private List<String>              pkNames;
        private Boolean                   isDdl;
        private String                    type;
        // binlog executeTime
        private Long                      es;
        // dml build timeStamp
        private Long                      ts;
        private String                    sql;
        private Map<String, Integer>      sqlType;
        private Map<String, String>       mysqlType;
        private List<Map<String, String>> data;
        private List<Map<String, String>> old;
    }
    

    总结

    正如本文开头所言,笔者在刚了解到canal机制的时候,确实觉得很不可思议。

    咦,它是怎么模拟MySQL slave的呢 ? 总觉得是不是有啥黑科技在里面。。。

    事实上,这是源于笔者对MySQL的无知。

    MySQL早就制定好了各种接口协议,怎么连接、验证、注册和dump都明明白白的写在那儿啦。

    正是应了那句话:花开正好,只待君来~

    相关文章

      网友评论

          本文标题:MySQL协议和canal实现

          本文链接:https://www.haomeiwen.com/subject/wzrdhhtx.html