美文网首页
canal 订阅mysql binlog

canal 订阅mysql binlog

作者: c458a5378a5a | 来源:发表于2018-10-31 14:32 被阅读0次

    原理

    canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,
    mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流)
    

    安装mysql(ubuntu)安装mysql的时候会提示设置密码

    # 安装
    sudo apt-get install mysql-server
    sudo apt isntall mysql-client
    sudo apt install libmysqlclient-dev
    # 设置远程登陆
    重命名
    sudo vm /etc/mysql/mysql.conf.d/mysqld.cnf my.cnf
    编辑
    sudo vim /etc/mysql/mysql.conf.d/my.cnf
    注释掉bind-address = 127.0.0.1:
    开启binlog
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=11
    # 登陆
    mysql -uroot -p123456
    授权
    grant all on *.* to root@'%' identified by '123456' with grant option;
    设置canal用户密码
    GRANT ALL on canal.* to 'canal'@'%' identified by '123456';
    GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'canal'@'%';
    flush privileges;
    查看是否开启binlog
    show variables like 'log_bin';
    show variables like 'binlog_format';
    show variables like 'binlog_row_image';
    
    重启mysql
    service mysql restart
    

    下载安装canal

    wget https://github.com/alibaba/canal/releases/download/v1.0.23/canal.deployer-1.0.23.tar.gz
    tar -zxvf canal.deployer-1.0.23.tar.gz
    

    修改canal配置文件

    cd conf/example
    vim instance.properties 
    
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    
    # position info
    canal.instance.master.address = 192.168.126.134:3306
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    
    # username/password
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = 123456
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    
    # table regex
    canal.instance.filter.regex = .*\\..*
    # table black regex
    canal.instance.filter.black.regex =  
    

    启动canal,关闭canal

    ./bin/startup.sh 
    ./bin/stop.sh 
    

    查看启动日志,运行日志

    cat ../canal/canal.log (start the canal server[172.17.0.1:11111])
    
    cat ../example/example.log
    

    java监控

    pom文件

     <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.0.12</version>
            </dependency>
    

    javafile

    package com.rzj.jdq.back.credit.web.advice;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    /**
     * Created by work01 on 5/23/18.
     */
    public class CanalTest {
        public static void main(String[] args) {
            // 创建链接172.17.0.1:11111或者192.168.126.134:11111
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("172.17.0.1", 11111), "example", "",
                    "");// AddressUtils.getHostIp(),
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe("tianyan\\..*");// .*代表database,..*代表table
                connector.rollback();//
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        // System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n",
                        // batchId, size);
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                CanalEntry.RowChange rowChage = null;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException(
                            "ERROR ## parser of eromanga-event has an error,data:"
                                    + entry.toString(), e);
                }
                CanalEntry.EventType eventType = rowChage.getEventType();
                System.out
                        .println(String
                                .format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s",
                                        entry.getHeader().getLogfileName(), entry
                                                .getHeader().getLogfileOffset(),
                                        entry.getHeader().getSchemaName(), entry
                                                .getHeader().getTableName(),
                                        eventType));
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue()
                        + "    update=" + column.getUpdated());
            }
        }
    
    }
    

    update后运行结果

    id : 1460    update=false
    mobilephones : 15216731250    update=false
    emails : wangys@rongzhijia.com    update=false
    content : [passRate-你我贷-业务异常] 超出阈值 阈值:[0.800] 前七天平均值:[0] 30分钟内实际值:[0.50] 告警时间:[2018-05-02 15:34:04] 详情点击。    update=false
    create_time : 2018-06-07 15:34:07    update=false
    update_time : 2018-05-23 13:46:17    update=false
    -------> after
    id : 1460    update=false
    mobilephones : 15216731250    update=false
    emails : wangys@rongzhijia.com    update=false
    content : [passRate-你我贷-业务异常] 超出阈值 阈值:[0.800] 前七天平均值:[0] 30分钟内实际值:[0.50] 告警时间:[2018-05-02 15:34:04] 详情点击。    update=false
    create_time : 2018-06-07 15:34:07    update=false
    update_time : 2018-05-09 13:46:17    update=true
    

    canal HA 搭建

    相关文章

      网友评论

          本文标题:canal 订阅mysql binlog

          本文链接:https://www.haomeiwen.com/subject/ruyrjftx.html