美文网首页
使用 docker 运行 RocketMQ + Canal +

使用 docker 运行 RocketMQ + Canal +

作者: thepoy | 来源:发表于2021-05-08 16:02 被阅读0次

    0 引言

    在很多业务情况下,我们都会在系统中引入ElasticSearch搜索引擎作为做全文检索的优化方案。

    如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新ElasticSearch的代码。

    下面我会以一个blog文章管理为例来演示canal+RocketMQGolang实现MySQLElasticSearch的数据同步。

    示例地址:https://gitee.com/thepoy/RocketMQ_Canal_ElasticSearch_Golang

    尽量不要在 macOS 中使用,创建的容器多多少少会有问题,出问题时很难找到症结所在,而在 linux 系统中使用则一切正常。

    1 RocketMQ

    RocketMQ是没有官方镜像的,所以需要在本地创建:

    cd rocketMQ
    docker build --no-cache -f Dockerfile -t rocketmq:4.8.0 --build-arg version=4.8.0 .
    

    可根据自己的需求对 Dockerfile 进行修改

    修改环境变量文件.env中的主机地址为自己的 ip 地址,然后使用 rocketMQ 目录中的配置文件创建容器:

    docker-compose --file compose.yml up
    

    2 Canal

    2.1 创建容器

    使用项目根目录中的配置文件创建mysqlcanal-admincanal-server容器:

    cd ..
    docker-compose --file compose.yml up
    

    也有一个环境变量文件需要修改,另外,compos 文件中的信息也需要根据需要修改,如 mysql 的 root 密码。

    2.2 为 canal 账号授权

    创建 mysql 容器时也创建了 canal 账号,需要为这个账号授权。

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    

    2.3 打开 canal 管理后台

    http://localhost:8089,打开后需要用admin账号登录,默认密码为123456,管理后台的界面如下图所示:

    image-20210507142335569

    因为 compose.yml 文件中已经配置了 canal-server,所以在后台中能看见已经启动的一个 server。

    2.4 配置实例 / Instance

    点击侧边栏的Instance管理,选择新建 Instance,选择那个唯一的主机,再点击载入模板,修改下面的一些参数:

    # 取消第 3 行中 mysql slaveId 的注释,随便修改为一个数字(不能是 1,因为 mysql 的 server_id=1)
    canal.instance.mysql.slaveId=1234
    # 修改 mysql 的地址,canal-admin 容器中也有一个 mysql 实例,我们不使用这个 mysql,而使用单独的 mysql 容器
    canal.instance.master.address=192.168.31.129:3306
    # 改成自己的数据库信息(需要监听的数据库,新建一个 database 就可以),这一行需要添加
    canal.instance.defaultDatabaseName = blog
    # table regex 需要过滤的表 这里数据库的中所有表
    canal.instance.filter.regex = .\*\\..\*
    # MQ 配置 日志数据会发送到 blog_articles 这个 topic 上
    canal.mq.topic=blog_articles
    

    实例名称随便填一个就行。

    创建好的新实例默认是停止状态,将其启动。

    image-20210507145221376

    创建 database 和 table:

    CREATE DATABASE IF NOT EXISTS `blog`;
    USE blog;
    CREATE TABLE IF NOT EXISTS `blog_articles` (
        `id` INT AUTO_INCREMENT PRIMARY KEY NOT NULL,
        `title` VARCHAR(100) NOT NULL UNIQUE,
        `content` TEXT NOT NULL,
        `created_date` VARCHAR(10) NOT NULL
    );
    

    2.5 配置 canal-server

    image-20210507145359290

    修改下面的参数:

    # 默信是 tcp, 修改为 rocketMQ
    canal.serverMode = rocketMQ
    ##################################################
    #########           RocketMQ         #############
    ##################################################
    rocketmq.producer.group = blog
    rocketmq.namesrv.addr = 192.168.31.129:9876
    

    保存后 server 会重启,这时打开 rocketMQ 控制台,能够看到新增加了一个主题blog_articles

    image-20210507150313195

    可以通过添加一行数据来测试是否成功:

    INSERT INTO blog.blog_articles
    (title, content, created_date)
    VALUES('test1', '这是第 1 个测试文章', '2020-01-01');
    

    添加后,在 rocketMQ 控制台查看消息:

    image-20210507150908030 image-20210507151008624

    可以看到,添加数据的消息已经产生等待消费。

    3 Elasticsearch

    elasticsearch 容器会在使用配置文件创建 Canal 时一同创建,需要注意的是,如果你想修改 elasticsearch 的 tag,可以在.env文件中修改ES_TAG的值。

    我没有创建 Kibana 容器,有需要的话可以自行创建。

    4 代码设计

    当数据库发生变化时,Canal 会将变化信息发送到 RocketMQ 中,所以我们只需要消费 RocketMQ 中的消息就可以做到即时或很快地将变化的数据同步到 Elasticsearch 中。

    4.1 RocketMQ

    常量
    const (
        // topic 在 Canal 中已经配置了,这里一定不能写错
        topic              string = "blog_articles"
        // 消费者组可以自定义,但要与 2.5 节中设置的 rocketmq.producer.group 相同
        consumerGroup      string = "blog"
    )
    

    从环境变量中获取host,并生成server

    var (
        server string
        Host   string
    )
    
    func init() {
        Host = os.Getenv("HOST")
        if Host == "" {
            Host = "localhost"
        }
        server = Host + ":9876"
    }
    
    结构体的设计

    虽然代码中没有用到这个结构体,但我觉得需要拿出来聊一聊:

    type ChangedData struct {
        // 变化的文档集合
        Data []es.Document `json:"data"`
        // 发生变化的数据库
        Database string `json:"database"`
        // 数据库内执行时间
        ES uint64 `json:"es"`
        // 就是 id
        ID uint `json:"id"`
        // 是否为 DDL 语句,create database、create table、alter table
        IsDDL bool `json:"isDdl"`
        // 表结构的字段类型
        MysqlType map[string]string `json:"mysqlType"`
        // 主键名称
        PrimaryKeyNames []string `json:"pkNames"`
        // sql 语句
        SQL string `json:"sql"`
        // sql 语句类型
        SqlType map[string]uint `json:"sqlType"`
        // 表名称
        Table string `json:"table"`
        // 操作类型,(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
        Type string `json:"type"`
        // 数据库内解析时间
        Timestamp uint `json:"ts"`
        // 旧数据
        Old []map[string]string `json:"old"`
    }
    

    其中es.Document结构如下:

    type Document struct {
        ID          string `json:"id,omitempty"`
        Title       string `json:"title,omitempty"`
        Content     string `json:"content,omitempty"`
        CreatedDate string `json:"created_date,omitempty"`
    }
    
    使用第三方 json 库

    这也是为什么没用到上面的结构体的原因。

    使用 json 标准库处理消息数据并同步到 es 中,完全是小题大做,会浪费很多的性能。

    data := gjson.Get(string(msg.body), "data")
    

    使用 gjson 库,可以方便地从 json 字符串中获取想要的数据,并进行后续处理,无需将整个 json 反序列化。

    使用 context 阻塞或退出消费线程

    启动消费订阅后,阻塞多久,就会消费多久,为了能够控制何时结束消费,这里使用contextcancle()函数控制:

        err = c.Start()
        ...
    
        select {
        case <-ctx.Done():
            fmt.Println(strings.Repeat("*", 60))
            fmt.Println("shutdown consumer")
            fmt.Println(strings.Repeat("*", 60))
        }
    
        err = c.Shutdown()
        ...
    

    4.2 Elasticsearch

    es 的代码是通用的,没有特别说明的意义,直接看代码即可。

    4.3 二者结合

    结合 RocketMQ 和 Elasticsearch 的代码,就能完成消息的即时消费文档的即时更新

    需要从消息中取出的数据

    上面的结构体对每个字段都有注释,此示例只取dataoldtype三个字段:

    // 将消息体解析成 gjson.Result
    body := gjson.Parse(string(msg.Body))
    // 从消息体中取 data
    data := body.Get("data").Array()
    // 从消息体中取 old
    old := body.Get("old").Array()
    // 从消息体中取 type
    canalTypeStr := body.Get("type").String()
    
    根据不同的操作以不同的方式更新数据

    本示例中的仅包括非 DDL 操作,仅限于基本的增、删、改,因为数据已同步到 es 中,所以 查 应该在 es 中进行。

    switch canalType {
        case canal.DELETE:
        ...
        case canal.UPDATE:
        ...
        case canal.INSERT:
        ...
        default:
        log.Fatal("未知操作", canalType)
    }
    

    5 操作结果

    设置环境变量(可选操作):

    export HOST=192.168.31.129
    

    运行示例,示例项目在core目录中:

    cd core
    go run main.go
    

    然后在数据库中添加一篇文章:

    INSERT INTO blog.blog_articles
    (title, content, created_date)
    VALUES('test9', '这是第 9 篇测试文章', '2020-01-01');
    

    在终端中就能看见日志:

    ...
    2021/05/08 15:05:41 已创建新的文档: map[content:这是第 9 篇测试文章 created_date:2020-01-01 id:10 title:test9]
    ...
    

    在 es 中查询一下id=10的文档:

    curl -X GET "http://localhost:9200/canal_es/_doc/10?pretty"
    

    查询结果:

    {
      "_index" : "canal_es",
      "_type" : "_doc",
      "_id" : "10",
      "_version" : 1,
      "_seq_no" : 14,
      "_primary_term" : 1,
      "found" : true,
      "_source" : {
        "content" : "这是第 9 篇测试文章",
        "created_date" : "2020-01-01",
        "id" : "10",
        "title" : "test9"
      }
    }
    

    在数据库中更新一下这篇文章的创建日期:

    UPDATE blog.blog_articles
    SET created_date='2009-04-15'
    WHERE id=10;
    

    终端日志:

    2021/05/08 15:15:08 文档已存在,即将更新...
    [200 OK] {"_index":"canal_es","_type":"_doc","_id":"10","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":15,"_primary_term":1}
    2021/05/08 15:15:08 已更新文档:id=10, new-data=map[created_date:2009-04-15]
    

    再查询一下这篇文章信息,结果为:

    {
      "_index" : "canal_es",
      "_type" : "_doc",
      "_id" : "10",
      "_version" : 2,
      "_seq_no" : 15,
      "_primary_term" : 1,
      "found" : true,
      "_source" : {
        "content" : "这是第 9 篇测试文章",
        "created_date" : "2009-04-15",
        "id" : "10",
        "title" : "test9"
      }
    }
    

    可见,创建日期已经更新。

    下面删除这篇文章:

    DELETE FROM blog.blog_articles
    WHERE id=10;
    

    终端日志:

    2021/05/08 15:18:03 即将删除文档  10
    2021/05/08 15:18:03 已删除: {"id":"10","title":"test9","content":"这是第 9 篇测试文章","created_date":"2009-04-15"}
    

    再查询一下这篇文档:

    {
      "_index" : "canal_es",
      "_type" : "_doc",
      "_id" : "10",
      "found" : false
    }
    

    es 中也已删除此文章。


    示例结束。

    相关文章

      网友评论

          本文标题:使用 docker 运行 RocketMQ + Canal +

          本文链接:https://www.haomeiwen.com/subject/zwiudltx.html