美文网首页这篇就够了
一文带你快速入门Canal,看这篇就够了!

一文带你快速入门Canal,看这篇就够了!

作者: 大数据老哥 | 来源:发表于2021-01-12 23:37 被阅读0次


    前言

              我们在做实时数仓时数据往往都是保存到数据库中例如MySQL,当有一条数据新增或修改需要马上将数据同步到kafka中或其他的数据库中,这时候我们需要借助阿里开源出来的Canal,来实现我们功能。

    一、什么是Canal

    我们看下官网的描述:

    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    二、Canal能干什么

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理

    注意: 当前Canal支持的MySQL版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    三、Canal工作原理

    • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
    • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
    • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

    canal 工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    四、部署Canal

    4.1 安装MySQL

              我之前发过如何部署MySQL我在这就不在写一遍了,如果你的机器中没有安装MySQL那可以去看这篇—> https://blog.csdn.net/qq_43791724/article/details/108196454

    开启MySQL的 binary log 日志

             当我们在安装成功MySQL成功后会有一个my.cnf文件需要添加一下内容

    [mysqld]
    log-bin=/var/lib/mysql/mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

             注意: 当我们在开启了binary log日志模式后会在我们log-bin目录下创建 mysql-bin.* 的文件。当我们数据库中的数据发生改变时就会mysql-bin.*文件中生成记录。

    4.2 安装Canal

    去官下载需要的版本 https://github.com/alibaba/canal/releases我在这里使用的版本为:1.0.24

    1. 将下载好的gz包上传到指定的目录下
    2. 创建个文件夹
    mkdir canal
    1. 解压gz包
    tar -zxvf canal.deployer-1.0.24.tar.gz  -C ../servers/canal/
    1. 配置 canal.properties

    common 属性前四个配置项:

    canal.id= 1
    canal.ip=
    canal.port= 11111
    canal.zkServers=

    canal.id是canal的编号,在集群环境下,不同canal的id不同,注意它和mysql的server_id不同。ip这里不指定,默认为本机,比如上面是192.168.100.201,端口号是11111。zk用于canal cluster。5. 再看下canal.propertiesdestinations相关的配置:

    #################################################
    #########       destinations        ############# 
    #################################################
    canal.destinations = example
    canal.conf.dir = ../conf
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    canal.instance.global.mode = spring 
    canal.instance.global.lazy = false
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml

    这里的canal.destinations = example可以设置多个,比如example1,example2,则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。全局的canal实例管理用spring,这里的file-instance.xml最终会实例化所有的destinations instances:\

    1. 全局的canal实例管理用spring,这里的file-instance.xml最终会实例化所有的destinations instances:
    <!-- properties -->
    <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
     <property name="ignoreResourceNotFound" value="true" />
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
        <property name="locationNames">
         <list>
             <value>classpath:canal.properties</value>                     <value>classpath:${canal.instance.destination:}/instance.properties</value>
             </list>
        </property>
    </bean>

    <bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
    <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
       <property name="propertyEditorRegistrars">
        <list>
          <ref bean="socketAddressEditor" />
           </list>
       </property>
    </bean>
    <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
     <property name="destination" value="${canal.instance.destination}" />
        <property name="eventParser">
         <ref local="eventParser" />
        </property>
        <property name="eventSink">
            <ref local="eventSink" />
        </property>
        <property name="eventStore">
            <ref local="eventStore" />
        </property>
        <property name="metaManager">
            <ref local="metaManager" />
        </property>
        <property name="alarmHandler">
            <ref local="alarmHandler" />
        </property>
    </bean>

    比如canal.instance.destination等于example,就会加载example/instance.properties配置文件7. 修改instance 配置文件

    ## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
    canal.instance.mysql.slaveId = 1234

    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=192.168.1.120:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = root
    canal.instance.dbPassword = 123456
    #################################################
    1. 启动
    sh bin/startup.sh
    1. 关闭
    sh bin/stop.sh
    1. 通过jps 查询服务状态
    [root@node01 ~]# jps
    2133 CanalLauncher
    4184 Jps

    到这里说明我们的服务就配好了,这时候我们可以使用java代码创建一个客户端来进行测试

    五、通过Java编写Canal客户端

    5.1 导入依赖

     <dependencies>
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.0.24</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.58</version>
            </dependency>
        </dependencies>

    5.2 编写测试类

    package com.canal.Test;

    /**
     * @author 大数据老哥
     * @version V1.0
     * @Package com.canal.Test
     * @File :CanalTest.java
     * @date 2021/1/11 21:54 */

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.InvalidProtocolBufferException;
    import java.net.InetSocketAddress;
    import java.util.List;

    /**
     * 测试canal配置是否成功 */
    public class CanalTest {

        public static void main(String[] args) {
            //1.创建连接
            CanalConnector connect = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.100.201", 11111),
                    "example", "", "");        //指定一次性读取的条数
            int bachChSize = 1000;
            // 设置转态
            boolean running = true;
            while (running) {
                //2.建立连接
                connect.connect();
                //回滚上次请求的信息放置防止数据丢失
                connect.rollback();
                // 订阅匹配日志
                connect.subscribe();
                while (running) {
                    Message message = connect.getWithoutAck(bachChSize);
                    // 获取batchId
                    long batchId = message.getId();
                    // 获取binlog数据的条数
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {

                    } else {
                        printSummary(message);
                    }
                    // 确认指定的batchId已经消费成功
                    connect.ack(batchId);
                }
            }
        }

        private static void printSummary(Message message) {
            // 遍历整个batch中的每个binlog实体
            for (CanalEntry.Entry entry : message.getEntries()) {
                // 事务开始
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }

                // 获取binlog文件名
                String logfileName = entry.getHeader().getLogfileName();
                // 获取logfile的偏移量
                long logfileOffset = entry.getHeader().getLogfileOffset();
                // 获取sql语句执行时间戳
                long executeTime = entry.getHeader().getExecuteTime();
                // 获取数据库名
                String schemaName = entry.getHeader().getSchemaName();
                // 获取表名
                String tableName = entry.getHeader().getTableName();
                // 获取事件类型 insert/update/delete
                String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();

                System.out.println("logfileName" + ":" + logfileName);
                System.out.println("logfileOffset" + ":" + logfileOffset);
                System.out.println("executeTime" + ":" + executeTime);
                System.out.println("schemaName" + ":" + schemaName);
                System.out.println("tableName" + ":" + tableName);
                System.out.println("eventTypeName" + ":" + eventTypeName);

                CanalEntry.RowChange rowChange = null;
                try {
                    // 获取存储数据,并将二进制字节数据解析为RowChange实体
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }

                // 迭代每一条变更数据
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    // 判断是否为删除事件
                    if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) {
                        System.out.println("---delete---");
                        printColumnList(rowData.getBeforeColumnsList());
                        System.out.println("---");
                    }
                    // 判断是否为更新事件
                    else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) {
                        System.out.println("---update---");
                        printColumnList(rowData.getBeforeColumnsList());
                        System.out.println("---");
                        printColumnList(rowData.getAfterColumnsList());
                    }
                    // 判断是否为插入事件
                    else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) {
                        System.out.println("---insert---");
                        printColumnList(rowData.getAfterColumnsList());
                        System.out.println("---");
                    }
                }
            }
        }
        // 打印所有列名和列值
        private static void printColumnList(List<CanalEntry.Column> columnList) {
            for (CanalEntry.Column column : columnList) {
                System.out.println(column.getName() + "\t" + column.getValue());
            }
        }
    }

    5.3 启动测试

    小结

              今天给大家分享了Canle它的主要的功能做增量数据同步,后面会使用Canle进行做实时数仓。我在这里为大家提供大数据的资源需要的朋友可以去下面GitHub去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

    资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板等资源请去GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigDataGitee 自行下载  https://gitee.com/li_hey_hey/dashboard/projects


    相关文章

      网友评论

        本文标题:一文带你快速入门Canal,看这篇就够了!

        本文链接:https://www.haomeiwen.com/subject/mbyyaktx.html