美文网首页
Canal(增量数据订阅与消费 )快速配置

Canal(增量数据订阅与消费 )快速配置

作者: 匿名的我呀 | 来源:发表于2018-10-09 10:49 被阅读0次

    模拟mysql的主从备份读取bin-log文件的机制,阿里开源项目,实现对mysq的日志文件的解析;

    搭建步骤:

    1.开启mysql的binlog功能,并配置binlog模式为row

    [mysqld]

    log-bin=mysql-bin #添加这一行就ok

    binlog-format=ROW #选择row模式

    server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

    2.在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)

    CREATE USER canal IDENTIFIED BY 'canal'; 

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

    FLUSH PRIVILEGES;

    3. 下载:https://github.com/alibaba/canal/releases

    4.解压,修改配置文件

    mysql serverId,改成唯一的

    vim conf/example/instance.properties

    canal.instance.mysql.slaveId = 129

    position info,需要改成自己的数据库信息

    canal.instance.master.address=192.168.21.134:3306

    username/password,需要改成自己的数据库信息

    canal.instance.dbUsername=canal

    canal.instance.dbPassword=canal

    canal.instance.defaultDatabaseName=test  指定数据库

    canal.instance.connectionCharset=UTF-8

    table regex

    canal.instance.filter.regex=.*\\..*

    5、修改 conf/canal.properties 文件

    canal.ip 改成canal所在机器的ip地址,避免无谓的问题 canal.id= 128

    canal.ip=192.168.21.134:

    canal.port= 11111


    启动

    bin 下的startup脚本

    查看日志 example 下

    运行canal-client实例:

    添加pom依赖:

    <dependency>

        <groupId>com.alibaba.otter</groupId>

        <artifactId>canal.client</artifactId>

        <version>1.0.12</version>

    </dependency>


    import com.alibaba.otter.canal.client.CanalConnector;

    import com.alibaba.otter.canal.client.CanalConnectors;

    import com.alibaba.otter.canal.protocol.Message;

    import java.net.InetSocketAddress;

    /**

    * @ClassName Demo

    * @Description TODO

    * @Author liang

    * @Date 2018\10\9 0009 10:25

    * @Version 1.0

    **/

    public class Demo {

    public static void main(String [] args)throws InterruptedException {

    CanalConnector connector = CanalConnectors.newSingleConnector(

    new InetSocketAddress("192.168.26.134",11111),"example","","");

    connector.connect();

    connector.subscribe(".*\\..*");

    while (true) {

    Message message = connector.getWithoutAck(100);

    long batchId = message.getId();

    System.out.println(batchId);

    if (batchId == -1 || message.getEntries().isEmpty()) {

    Thread.sleep(3000);

    }else {

    System.out.println(message.getEntries());

    connector.ack(batchId);

    }

    }

    }

    }

    相关文章

      网友评论

          本文标题:Canal(增量数据订阅与消费 )快速配置

          本文链接:https://www.haomeiwen.com/subject/byadaftx.html