美文网首页
kafka kafka消费者

kafka kafka消费者

作者: dylan丶QAQ | 来源:发表于2020-10-17 12:55 被阅读0次

    起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,在运用过之后,也去学一学kafka,了解一下他们之间的差别,吃一吃架构方面的相关内容,提升自己。


    1. Kafka消费方式分析

    kafka里consumer采用的是pull的方式从broker里取数据

    • push推的方式很难适应消费速率不同的消费者,消息发送速率是有broker决定的,典型的问题表现是消费端拒绝访问和网络堵塞

    • pull的方式的消费速率是由consumer来确定,如果kafka的topic里没有数据,consumer会长期获取空数据,kafka会在消费时传入一个timeout,如果拉取没有数据,就会等待timeout时长后再返回

    2. Kafka消费分区访问策略

    一个consumer group中有多个consumer,一个topic里有多个partition,这就涉及了partition的分配问题,确定那个partition由哪个consumer来消费

    kafka有三种分配策略:range(范围模式,默认的),roundrobin(均衡),sticky(粘性方式v0.11新增)

    • range:默认的分区消费策略

      无论多少个分区,只有一个消费者,那么所有分区都分配给这个消费者

      每次新的消费者加入消费者组都会触发新的分配

      分配策略:

      • 按照topic进行一组来分配给订阅了这个topic的consumer group中的consumer

      • n=分区数/消费者数量,m=分区数%消费者数量,第一个消费者分配n+m个分区,后面的分配n个分区

        [图片上传失败...(image-8e5c6-1602662975455)]

    # 例1,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有4个分区
    c0: t0p0,t0p1,t1p0,t1p1 
    c1: t0p2,t0p3,t1p2,t1p3
    # 例2,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
    c0: t0p0,t0p1,t1p0,t1p1
    c1: t0p2,t1p2
    
    • roundrobin:负载均衡的方式

      按照消费者组里的消费者进行平均分配

      可以通过配置:partition.assignment.strategy

      class org.apache.kafka.clients.consumer.RoundRobinAssignor

      负载均衡也要看是否订阅了这个topic

      每次新的消费者加入消费者组都会触发新的分配

    # 例1: 假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
    c0: t0p0,t0p2,t1p1
    c1: t0p1,t1p0,t1p2
    # 例2: 3个消费者c0,c1,c2, 有三个topic,每个topic有3个分区,对于消费者而言,c0订阅的t0,c1订阅的t0和t1,c2订阅的t0,t1,t2
    c0: t0p0
    c1: t0p1,t1p0,t1p2
    c2: t0p2,t1p1,t2p0,t2p1,t2p2
    
    • Sticky:粘性策略

      kafka的v0.11版本引入的:class org.apache.kafka.clients.consumer.StickyAssignor

      主要实现的目录

      • 分区的分配要尽可能的均匀

      • 分区的分配尽可能的和上次分配保持一致

      • 当两者冲突时,第一个目标优先第二个目标

    # 例1:三个消费者c0,c1,c2,都订阅了4个主题,t0,t1,t2,t3,每个topic有两个分区
    c0: t0p0,t1p1,t3p0
    c1: t0p1,t2p0,t3p1
    c2: t1p0,t2p1
    # 这个分配很像负载均衡
    

    如果c1退出消费者组

    # roundrobin策略下
    c0: t0p0,t1p0,t2p0,t3p0
    c2: t0p1,t1p1,t2p1,t3p1
    # sticky策略下
    c0: t0p0,t1p1,t3p0,t2p0
    c2: t1p0,t2p1,t0p1,t3p1
    

    3. Springboot整合Kafka进行消息收发

    3.1. Producer发送端

    首先引入POM依赖

     <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     </dependency>
    

    配置producer的yaml

    spring:
     kafka:
     bootstrap-servers: 39.99.222.44:9092
     producer:
     retries: 3 #发送消息的重试次数
     batch-size: 16384
     acks: 1 #等待partition leader落盘完成
     buffer-memory: 33554432 #设置生产者内存缓存的大小
     key-serializer: org.apache.kafka.common.serialization.StringSerializer
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    发送代码编写

    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    ​
    import javax.annotation.Resource;
    ​
    @Component
    public class KafkaProducerService {
    ​
     @Resource
     private KafkaTemplate<String,Object> kafkaTemplate;
    ​
     public void sendMessage(String topic,String key,Object object){
     ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,object);
     future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
     @Override
     public void onFailure(Throwable throwable) {
     System.out.println("********消息发送成功:"+throwable.toString());
    ​
     }
     @Override
     public void onSuccess(SendResult<String, Object> result) {
     System.out.println("=========消息发送成功:"+result.toString());
     }
     });
     }
    }
    

    调用测试

    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import com.icodingedu.producter.service.KafkaProducerService;
    ​
    @SpringBootTest
    class KafkaProducterApplicationTests {
    ​
     @Autowired
     KafkaProducerService kafkaProducerService;
    ​
     @Test
     void contextLoads() {
     String topic = "topicfirst";
     for(int i=0;i<10;i++){
     kafkaProducerService.sendMessage(topic,"key:"+i,"hello kafka "+i);
     }
     System.out.println("======发送完成======");
     }
    }
    

    在没有设置分区和key的情况下,按照轮询方式写入数据,消费结果如下

    # 读取的值
    hello kafka 1
    hello kafka 6
    hello kafka 2
    hello kafka 7
    hello kafka 3
    hello kafka 8
    hello kafka 4
    hello kafka 9
    hello kafka 0
    hello kafka 5
    Partition: 0    1    2    3    4 
     1 6  2 7  3 8  4 9  0 5
    

    不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!

    相关文章

      网友评论

          本文标题:kafka kafka消费者

          本文链接:https://www.haomeiwen.com/subject/ojrwpktx.html