美文网首页Apache Kafka
利用maxwell 组件实时监听Mysql的Binlog日志,并

利用maxwell 组件实时监听Mysql的Binlog日志,并

作者: 简单的菜鸟 | 来源:发表于2018-01-09 19:11 被阅读640次

    一:在linux环境下安装部署好mysql


    1 :开启binlog

         sudo vi /etc/my.cnf (Mysql的配置文件)

    2: mysql的binlog格式有3种,为了把binlog解析成json数据格式,要设置binlog的格式为row(binlog有三种格式:Statement、Row以及Mixed)

    server-id=1

    log-bin=/data/mysql/log/binlog(这一步开启binlog)

    binlog_format=row

    具体如下图:


    3:重启msyql服务

    sudo service mysqld restart

    4:查看是否已经开启binlog

    Mysql>show variables like '%log_bin%';


    此时在/data/mysql/log/binlog目录下可以看到生成了相应的binlog监听日志文件,如图,binlog.000004文件,每次重启msyql服务,就会生成一个新的监听文件,这里日志文件名称跟步骤2中配置的log-bin=/data/mysql/log/binlog有关,如果log-bin=master配置这样,那么日志文件名称就是master.000004。


    二:配置Maxwell相关的部署工作


    1:下载Maxwell

    官网:http://maxwells-daemon.io/

    组件下载链接

    https://github.com/zendesk/maxwell/releases/download/v1.11.0/maxwell-1.11.0.tar.gz

    2:上传maxwell-1.11.0.tar.gz

       通过winscp等FTP工具把maxwell-1.11.0.tar.gz到/usr/local/maxwell/ 目录下

    3:解压maxwell-1.11.0.tar.gz

        tar -zxvf maxwell-1.11.0.tar.gz

    4:创建maxwell使用的mysql用户并赋权

    mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'XXXXXX';

    mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';

    以上图片为官网参考

    以我自己的为例:

    GRANT ALL on *.* to 'user01'@'%' identified by 'test123';

    把所有数据库的所有表授权给user01用户以密码test123登录

    GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';

    flush privileges;

    (mysql 新设置用户或更改密码后需用flush privileges刷新MySQL的系统权限相关表,否则会出现拒绝访问,还有一种方法,就是重新启动mysql服务器,来使新设置生效。­)

    4::开启maxwell命令行(注意,如果没有设置,maxwell默认是把监听的mysql的binlog日志发送到kafka的主题叫maxwell的topic上的)

    bin/maxwell --user='maxwell' --password='xxxx' --host='数据库IP地址' --producer=kafka --kafka.bootstrap.servers=bi-master:9092

    解释:数据库IP地址参数是安装mysql的那台主机,最后的kafka.bootstrap.servers是安装kafka集群的节点主机名(最好不要用IP地址)和端口号。

    三:kafka相关配置


    说明(以下我的kafka是安装在主机名叫bi-master,注意kafka里的配置文件端口号要和命令行里给的端口号一致)

    1:首先启动zookeeper

    $sbin/zkServer.sh start

    2:开启kafka命令行

    bin/kafka-server-start.shconfig/server.properties

    3:在kafka中创建一个主题叫maxwell以便于接受数据

    bin/kafka-topics.sh--create --zookeeper bi-master:2181 --replication-factor 1 --partitions 1 --topic maxwell

    4:启动kafka消息生产者窗口

    bin/kafka-console-producer.sh --broker-list bi-master:9092 --topic maxwell

    5:启动kafka消息消费者窗口

    bin/kafka-console-consumer.sh --zookeeper bi-master:2181 --topic maxwell --from-beginning

    四:测试


    1:在Mysql客户工具 中添加一个数据

    测试的表结构,表名称:myTest

    消费窗口接收到的消息:

    {"database":"binlogTest","table":"myTest","type":"insert","ts":1515494531,"xid":7693,"commit":true,"data":{"name":"小梅","sex":"女","age":18,"address":"深圳市南山区海岸城"}}

    2:修改一条数据:

    消费窗口接收到的消息:

    {"database":"binlogTest","table":"myTest","type":"update","ts":1515494707,"xid":7756,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田区"},"old":{"sex":"女","address":"深圳市南山区海岸城"}}

    3:删除一条数据

    消费窗口接收到的消息:

    {"database":"binlogTest","table":"myTest","type":"delete","ts":1515494807,"xid":7799,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田区"}}

    五:通过java程序去消费kafka消息数据

    1:jar包添加

    2:代码实现

    第一种实现:

    第二种实现:

    相关文章

      网友评论

      • 老何_da7c:是不是高版本的maxwell对过滤做了调整?我测试的时候,在同一个实例下有几个前缀相同的数据库,例如CRM,CRM1,CRM2。 我设置include_dbs=CRM的时候,在操作其他CRM* 数据库表的时候也触发了往kafka发消息。

        或者命令行启动的时候,该如何设置exclude ,include 规则?

      本文标题:利用maxwell 组件实时监听Mysql的Binlog日志,并

      本文链接:https://www.haomeiwen.com/subject/bosgnxtx.html