美文网首页
canal 基于zookeeper的HA安装

canal 基于zookeeper的HA安装

作者: holly_wang_王小飞 | 来源:发表于2016-12-07 16:40 被阅读0次

    我的zookeeper 集群已经安装
    我的mysql已经安装
    两台机器
    10.86.43.154
    10.93.0.192

    cd /home/q/
    sudo mkdir canal
    cd canal/
    sudo wget https://github.com/alibaba/canal/releases/download/canal-1.0.22/canal.deployer-1.0.22.tar.gz
    sudo mkdir canal-deployer
    sudo tar zxvf canal.deployer-1.0.22.tar.gz -C ./canal-deployer
    

    需要 mysql支持复制模式

    sudo vim /etc/my.cnf
    
    [mysqld]
    datadir=/var/lib/mysql
    socket=/var/lib/mysql/mysql.sock
    symbolic-links=0
    log-bin=mysql-bin #添加这一行就ok  
    binlog-format=ROW #选择row模式
    server_id=1#配置mysql replaction需要定义,不能和canal的slaveId重复   
    
    [mysqld_safe]
    log-error=/var/log/mysqld.log
    pid-file=/var/run/mysqld/mysqld.pid
    

    地球人都知道,更新mysql配置my.cnf需要重启mysql才能生效,但是有些时候mysql在线上,不一定允许你重启,这时候应该怎么办呢?(我的已经修改过)

    mysql> show variables like 'binlog_format';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.00 sec)
    mysql> show variables like 'binlog_format';
    +---------------+-----------+
    | Variable_name | Value     |
    +---------------+-----------+
    | binlog_format | STATEMENT |
    +---------------+-----------+
    1 row in set (0.00 sec)
    
    mysql> set binlog_format='ROW';
    Query OK, 0 rows affected (0.00 sec)
    
    mysql> show variables like 'binlog_format';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.00 sec)
    
    

    但是 执行

    mysql> set log_bin='mysql_bin';
    ERROR 1238 (HY000): Variable 'log_bin' is a read only variable
    
    

    开启mysql 日志是必须重启mysql了,gdb修改也不管用

    mysql有以下几种日志:
    错误日志: -log-err
    查询日志: -log
    慢查询日志: -log-slow-queries
    更新日志: -log-update
    二进制日志: -log-bin
    mysql> show variables like  '%log%';
    +-----------------------------------------+---------------------------------+
    | Variable_name                           | Value                           |
    +-----------------------------------------+---------------------------------+
    | back_log                                | 50                              |
    | binlog_cache_size                       | 32768                           |
    | binlog_direct_non_transactional_updates | OFF                             |
    | binlog_format                           | ROW                             |
    | expire_logs_days                        | 0                               |
    | general_log                             | OFF                             |
    | general_log_file                        | /var/run/mysqld/mysqld.log      |
    | innodb_flush_log_at_trx_commit          | 1                               |
    | innodb_locks_unsafe_for_binlog          | OFF                             |
    | innodb_log_buffer_size                  | 1048576                         |
    | innodb_log_file_size                    | 5242880                         |
    | innodb_log_files_in_group               | 2                               |
    | innodb_log_group_home_dir               | ./                              |
    | innodb_mirrored_log_groups              | 1                               |
    | log                                     | OFF                             |
    | log_bin                                 | ON                              |
    | log_bin_trust_function_creators         | OFF                             |
    | log_bin_trust_routine_creators          | OFF                             |
    | log_error                               | /var/log/mysqld.log             |
    | log_output                              | FILE                            |
    | log_queries_not_using_indexes           | OFF                             |
    | log_slave_updates                       | OFF                             |
    | log_slow_queries                        | OFF                             |
    | log_warnings                            | 1                               |
    | max_binlog_cache_size                   | 18446744073709547520            |
    | max_binlog_size                         | 1073741824                      |
    | max_relay_log_size                      | 0                               |
    | relay_log                               |                                 |
    | relay_log_index                         |                                 |
    | relay_log_info_file                     | relay-log.info                  |
    | relay_log_purge                         | ON                              |
    | relay_log_space_limit                   | 0                               |
    | slow_query_log                          | OFF                             |
    | slow_query_log_file                     | /var/run/mysqld/mysqld-slow.log |
    | sql_log_bin                             | ON                              |
    | sql_log_off                             | OFF                             |
    | sql_log_update                          | ON                              |
    | sync_binlog                             | 0                               |
    +-----------------------------------------+---------------------------------+
    38 rows in set (0.00 sec)
    
    mysql> show variables like 'log_bin';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.00 sec)
    
    

    canal配置

    sudo vim canal.properties
    
    #################################################
    #########               common argument         ############# 
    #################################################
    canal.id= 1
    canal.ip=
    canal.port= 11111
    canal.zkServers=10.86.43.164:4180,10.86.43.154:4180,10.93.0.192:4180
    # flush data to zk
    canal.zookeeper.flush.period = 1000
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 16384
    ## memory store RingBuffer used memory unit size , default 1kb
    canal.instance.memory.buffer.memunit = 1024 
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE
    
    ## detecing config
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60
    
    # network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    
    # binlog filter config
    canal.instance.filter.query.dcl = false
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    
    # binlog format/image check
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    #################################################
    #########               destinations            ############# 
    #################################################
    canal.destinations= example
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    canal.instance.global.mode = spring 
    canal.instance.global.lazy = false
    #canal.instance.global.manager.address = 127.0.0.1:1099
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    

    修改 instance.properties

    cd example
    sudo vim  instance.properties
    
    #################################################
    ## mysql serverId
    canal.instance.mysql.slaveId = 1235 #两台不一样
    
    # position info
    canal.instance.master.address = 10.93.0.192:3306
    canal.instance.master.journal.name =
    canal.instance.master.position =
    canal.instance.master.timestamp =
    
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    
    # username/password
    canal.instance.dbUsername = 用户名
    canal.instance.dbPassword = 密码
    canal.instance.defaultDatabaseName = canal_test
    canal.instance.connectionCharset = UTF-8
    
    # table regex
    canal.instance.filter.regex = .*\\..*
    # table black regex
    canal.instance.filter.black.regex =
    
    #################################################
    
    

    两台做同样的操作
    启动及 查看日志

    cd ../../bin
    sudo ./startup.sh
    cd  /home/q/canal/canal-deployer/logs
    

    可以查看 canal的日志 和对应的instance日志
    canal是看canal是否启动正常
    instance是查看对应的实例是否和mysql的master连接正常

    客户端代码
    新建maven项目
    pom加入依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.sample</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>canal.sample</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>  
            <groupId>com.alibaba.otter</groupId>  
            <artifactId>canal.client</artifactId>  
            <version>1.0.12</version>  
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>13.0</version>
        </dependency>
      </dependencies>
    </project>
    
    

    创建类

    package com.alibaba.otter.canal.sample;
    
    
    import java.net.InetSocketAddress;  
    import java.util.List;  
      
    import com.alibaba.otter.canal.client.CanalConnector;  
    import com.alibaba.otter.canal.common.utils.AddressUtils;  
    import com.alibaba.otter.canal.protocol.Message;  
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
    import com.alibaba.otter.canal.client.*;  
    import org.jetbrains.annotations.NotNull;
    public class App 
    {
         public static void main(String args[]) {  
                // 创建链接  
                CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.93.0.192",  
                        11111), "example", "", "");  
                int batchSize = 1000;  
                int emptyCount = 0;  
                try {  
                    connector.connect();  
                    connector.subscribe(".*\\..*");  
                    connector.rollback();  
                    int totalEmtryCount = 1200;  
                    while (emptyCount < totalEmtryCount) {  
                        Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
                        System.out.println(message);
                        long batchId = message.getId();  
                        int size = message.getEntries().size();  
                        if (batchId == -1 || size == 0) {  
                            emptyCount++;  
                            System.out.println("empty count : " + emptyCount);  
                            try {  
                                Thread.sleep(1000);  
                            } catch (InterruptedException e) {  
                                e.printStackTrace();  
                            }  
                        } else {  
                            emptyCount = 0;  
                            // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  
                            printEntry(message.getEntries());  
                        }  
          
                        connector.ack(batchId); // 提交确认  
                        // connector.rollback(batchId); // 处理失败, 回滚数据  
                    }  
          
                    System.out.println("empty too many times, exit");  
                } finally {  
                    connector.disconnect();  
                }  
            }  
          
            private static void printEntry(@NotNull List<Entry> entrys) {  
                for (Entry entry : entrys) {  
                    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
                        continue;  
                    }  
          
                    RowChange rowChage = null;  
                    try {  
                        rowChage = RowChange.parseFrom(entry.getStoreValue());  
                    } catch (Exception e) {  
                        throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                                e);  
                    }  
          
                    EventType eventType = rowChage.getEventType();  
                    System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                            entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                            entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                            eventType));  
          
                    for (RowData rowData : rowChage.getRowDatasList()) {  
                        if (eventType == EventType.DELETE) {  
                            printColumn(rowData.getBeforeColumnsList());  
                        } else if (eventType == EventType.INSERT) {  
                            printColumn(rowData.getAfterColumnsList());  
                        } else {  
                            System.out.println("-------> before");  
                            printColumn(rowData.getBeforeColumnsList());  
                            System.out.println("-------> after");  
                            printColumn(rowData.getAfterColumnsList());  
                        }  
                    }  
                }  
            }  
          
            private static void printColumn(@NotNull List<Column> columns) {  
                for (Column column : columns) {  
                    System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
                }  
            }  
    }
    
    

    相关文章

      网友评论

          本文标题:canal 基于zookeeper的HA安装

          本文链接:https://www.haomeiwen.com/subject/oandmttx.html