美文网首页
kafka-docker上使用+常用指令

kafka-docker上使用+常用指令

作者: 灿烂的GL | 来源:发表于2021-12-09 18:52 被阅读0次

    架构

    生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion

    docker中kafka的使用


    启动镜像

    docker run -dit -p 9200:9200 kafka镜像id (端口映射之前有说过)

    启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/server.properties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考

    docker run -dit --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.3:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.4:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

    其中172.17.0.3需要改成自己docker的网桥连接地址

    查看已启动容器

    docker ps

    查看所有容器

    docker ps -a

    启动未启动的容器

    docker start 容器id

    进入kafka容器

    docker exec -it 容器id bash(默认进入/目录)

    创建主题

    ./kafka-topics.sh --create --zookeeper 172.17.0.3:2181/kafka --replication-factor 1 --partitions 3 --topic sun

    主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分)

    查看topic信息

    ./kafka-topics.sh --describe --zookeeper 172.17.0.3/kafka --topic sun


    集群部署
    可以通过compose集群化部署过es,这里通过创建另一个compose.yml文件来部署kafka,配置文件参考 docker-compose集群部署

    docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -d


    spring 中kafka生产消费

    生产者:

    ./kafka-console-producer.sh --bootstrap-server node1:9092 --topic my-kafka-topic

    消费者:
    方式一:从当前主题的迁移量位置+1开始取数据

    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic my-kafka-topic

    方式二:从当前主题第一条消息开始消费

    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic my-kafka-top

    生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的
    单播消息:
    当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息

    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup --topic my-kafka-topic

    多播消息:
    当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息

    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup1 --topic my-kafka-topic
    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup2 --topic my-kafka-topic

    查看消费组详细信息:

    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --describe --group testGroup

    TOPIC   PARTITION  CURRENT-OFFSET   LOG-END-OFFSET  LAG   
    cat       2        79668            79668             0             
    

    CURRENT-OFFSET:最后被消费的偏移量
    LOG-END-OFFSET:消息总量(最后一条消息的偏移量)
    LAG :积压了多少条消息


    常见问题:
    1、如何防止消息丢失
    生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2
    消费者:把自动提交改成手动提交
    2、如何防止消息的重复消费
    针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;
    3、消息积压
    消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。

    ps:
    缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker


    参考链接:
    1、kafka安装教程--推荐
    2、kafka配置文件server.properties参数说明
    3、创建主题分区数
    4、解决docker容器启动不了的问题
    5、通过docker-compose集群部署
    6、学习视频

    相关文章

      网友评论

          本文标题:kafka-docker上使用+常用指令

          本文链接:https://www.haomeiwen.com/subject/yjxwlltx.html