美文网首页
mysql的binlog+maxwell+kafka实现数据无侵

mysql的binlog+maxwell+kafka实现数据无侵

作者: simperLv | 来源:发表于2022-01-20 16:23 被阅读0次

mysql的准备

首先开启binlog日志

  1. 进入mysql容器
docker exec -it {container id} /bin/bash
  1. cd /etc/mysql ,vi my.cnf 可能在容器中需要安装 vim
apt-get update
apt-get install vim
  1. 修改my.cnf
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
  1. 重启
  2. 验证
show variables like '%log_bin%'

或者连接数据库开启binlog

mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;

创建Maxwell用户,并赋予 maxwell 库的一些权限

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

# or for running maxwell locally:

mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

启动kafka

kafka在之前的文章中已经弄好了,没安装的可以先去看docker启动kafka

docker启动maxwell

拉取镜像

docker pull zendesk/maxwell

启动maxwell,并将解析出的binlog输出到控制台

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell'  --password='123456' --host='192.168.92.66' --producer=stdout

输出到kafka

docker run -d  -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='192.168.92.66' --producer=kafka \
    --kafka.bootstrap.servers='192.168.92.66:9092' --kafka_topic=maxwell --log_level=debug

消费kafka中的log信息,这边用的java

application.yml中的配置信息

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: 123456
    url: jdbc:mysql://localhost:3307/ficus?characterEncoding=utf-8
  kafka:
    bootstrap-server: 192.168.92.66:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: maxwell
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 但消费者监听的topic不存在时,保证能够是项目启动
      missing-topics-fatal: false

测试工程整个截图如下(写得比较辣眼睛,大家有时间的话还是可以写好点)


工程图

用到的依赖

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- mybatis -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatisplus.version}</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>${mybatisplus.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

测试

我们另起一个mysql,新建了一张测试表collect

create table resdata
(
    id    int auto_increment
        primary key,
    name  varchar(20) null,
    age   int         null,
    value varchar(20) null
);
然后添加数据,可以在maxwell的控制台看到输出的信息 maxwell kafka收到的信息![kafka] kafka

效果图

image.png

自己的蠢操作

这里mysql的ip不能写localhost,否则容器内没办法找到宿主机mysql


image.png

除了kafka还可以选择redis等,可以点击官网链接看看。

maxwell详解参考MySQL Binlog 解析工具 Maxwell 详解

相关文章

网友评论

      本文标题:mysql的binlog+maxwell+kafka实现数据无侵

      本文链接:https://www.haomeiwen.com/subject/cdwacrtx.html