mysql的准备
首先开启binlog日志
- 进入mysql容器
docker exec -it {container id} /bin/bash
- cd /etc/mysql ,vi my.cnf 可能在容器中需要安装 vim
apt-get update
apt-get install vim
- 修改my.cnf
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
- 重启
- 验证
show variables like '%log_bin%'
或者连接数据库开启binlog
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
创建Maxwell用户,并赋予 maxwell 库的一些权限
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
# or for running maxwell locally:
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
启动kafka
kafka在之前的文章中已经弄好了,没安装的可以先去看docker启动kafka
docker启动maxwell
拉取镜像
docker pull zendesk/maxwell
启动maxwell,并将解析出的binlog输出到控制台
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='192.168.92.66' --producer=stdout
输出到kafka
docker run -d -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='192.168.92.66' --producer=kafka \
--kafka.bootstrap.servers='192.168.92.66:9092' --kafka_topic=maxwell --log_level=debug
消费kafka中的log信息,这边用的java
application.yml中的配置信息
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://localhost:3307/ficus?characterEncoding=utf-8
kafka:
bootstrap-server: 192.168.92.66:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: maxwell
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 但消费者监听的topic不存在时,保证能够是项目启动
missing-topics-fatal: false
测试工程整个截图如下(写得比较辣眼睛,大家有时间的话还是可以写好点)
工程图
用到的依赖
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatisplus.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>${mybatisplus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
测试
我们另起一个mysql,新建了一张测试表collect
create table resdata
(
id int auto_increment
primary key,
name varchar(20) null,
age int null,
value varchar(20) null
);
然后添加数据,可以在maxwell的控制台看到输出的信息
maxwell
kafka收到的信息![kafka]
kafka
效果图
image.png自己的蠢操作
这里mysql的ip不能写localhost,否则容器内没办法找到宿主机mysql
image.png
除了kafka还可以选择redis等,可以点击官网链接看看。
maxwell详解参考MySQL Binlog 解析工具 Maxwell 详解
网友评论