美文网首页flink
MaxWell+kafka解析mysql binlog

MaxWell+kafka解析mysql binlog

作者: 陌上冰火 | 来源:发表于2019-12-09 15:36 被阅读0次

    1. maxwell简介

     maxwell,可以监听mysql binlog文件,实时进行更新,以json格式,写到kafka,redis,Kinesis,sqs,pubsub,rabbitmq,file等。
     官网:  http://maxwells-daemon.io
     下载地址:https://github.com/zendesk/maxwell

    2. 修改mysql binlog格式为row模式

    2.1 查看binlog是否开启

    mysql> show variables like '%log_bin%';
    
    binlog配置

    2.2 退出mysql,查看配置文件,/etc/my.conf(macOS的路径)

    vim /etc/my.conf
    
    my.conf

    2.3 修改binlog_format 为 row

    log-bin=mysql-bin
    binlog_format=row
    server-id=1
    

    2.4 重启mysql

    service mysqld restart
    

    binlog format 三种方式说明:
    https://www.cnblogs.com/xingyunfashi/p/8431780.html

    3. mysql权限配置

    mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
    mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
    

    4. 执行maxwell命令,开启监听

    # 未过滤
    bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
       --producer=kafka --kafka.bootstrap.servers=localhost:9092
    默认消息会写到topic为 maxwell中
    
    执行结果 执行成功
    # 过滤数据库
    /usr/local/maxwell/bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
    --producer=kafka --kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092 \
    --kafka_topic=maxwells  --filter 'exclude: dbName01.*, include: dbName02.*'
    

    更多参数设置参考:http://maxwells-daemon.io/config/

    5. kafka代码监听

    # pom.xml
    <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.1</version>
            </dependency>
        </dependencies>
    
    # KafkaConsumer.java
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumer {
    
        public static void main(String[] args) {
            //连接kafka集群的参数
            Properties prop = new Properties();
    
            prop.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
            prop.put("group.id", "test");
            prop.put("enable.auto.commit", "true");
            prop.put("auto.commit.interval.ms", "1000");
    
            prop.put("key.deserializer",
                 "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
    
            org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(prop);
    
            //订阅生产者的topic
            consumer.subscribe(Arrays.asList("maxwell"));
    
            while (true){
                //poll获取元素
                ConsumerRecords<String, String> records = consumer.poll(100);
    
                for (ConsumerRecord<String, String> record : records){
                    System.out.println("消费的数据为:"+record.value());
                }
            }
        }
    }
    

    6. 测试

    6.1 运行代码

    6.2 操作mysql mysql 6.3 查看idea控制台 result

    相关文章

      网友评论

        本文标题:MaxWell+kafka解析mysql binlog

        本文链接:https://www.haomeiwen.com/subject/fvibgctx.html