美文网首页
kafka kafka消费者

kafka kafka消费者

作者: dylan丶QAQ | 来源:发表于2020-10-17 12:55 被阅读0次

起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,在运用过之后,也去学一学kafka,了解一下他们之间的差别,吃一吃架构方面的相关内容,提升自己。


1. Kafka消费方式分析

kafka里consumer采用的是pull的方式从broker里取数据

  • push推的方式很难适应消费速率不同的消费者,消息发送速率是有broker决定的,典型的问题表现是消费端拒绝访问和网络堵塞

  • pull的方式的消费速率是由consumer来确定,如果kafka的topic里没有数据,consumer会长期获取空数据,kafka会在消费时传入一个timeout,如果拉取没有数据,就会等待timeout时长后再返回

2. Kafka消费分区访问策略

一个consumer group中有多个consumer,一个topic里有多个partition,这就涉及了partition的分配问题,确定那个partition由哪个consumer来消费

kafka有三种分配策略:range(范围模式,默认的),roundrobin(均衡),sticky(粘性方式v0.11新增)

  • range:默认的分区消费策略

    无论多少个分区,只有一个消费者,那么所有分区都分配给这个消费者

    每次新的消费者加入消费者组都会触发新的分配

    分配策略:

    • 按照topic进行一组来分配给订阅了这个topic的consumer group中的consumer

    • n=分区数/消费者数量,m=分区数%消费者数量,第一个消费者分配n+m个分区,后面的分配n个分区

      [图片上传失败...(image-8e5c6-1602662975455)]

# 例1,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有4个分区
c0: t0p0,t0p1,t1p0,t1p1 
c1: t0p2,t0p3,t1p2,t1p3
# 例2,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
c0: t0p0,t0p1,t1p0,t1p1
c1: t0p2,t1p2
  • roundrobin:负载均衡的方式

    按照消费者组里的消费者进行平均分配

    可以通过配置:partition.assignment.strategy

    class org.apache.kafka.clients.consumer.RoundRobinAssignor

    负载均衡也要看是否订阅了这个topic

    每次新的消费者加入消费者组都会触发新的分配

# 例1: 假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
c0: t0p0,t0p2,t1p1
c1: t0p1,t1p0,t1p2
# 例2: 3个消费者c0,c1,c2, 有三个topic,每个topic有3个分区,对于消费者而言,c0订阅的t0,c1订阅的t0和t1,c2订阅的t0,t1,t2
c0: t0p0
c1: t0p1,t1p0,t1p2
c2: t0p2,t1p1,t2p0,t2p1,t2p2
  • Sticky:粘性策略

    kafka的v0.11版本引入的:class org.apache.kafka.clients.consumer.StickyAssignor

    主要实现的目录

    • 分区的分配要尽可能的均匀

    • 分区的分配尽可能的和上次分配保持一致

    • 当两者冲突时,第一个目标优先第二个目标

# 例1:三个消费者c0,c1,c2,都订阅了4个主题,t0,t1,t2,t3,每个topic有两个分区
c0: t0p0,t1p1,t3p0
c1: t0p1,t2p0,t3p1
c2: t1p0,t2p1
# 这个分配很像负载均衡

如果c1退出消费者组

# roundrobin策略下
c0: t0p0,t1p0,t2p0,t3p0
c2: t0p1,t1p1,t2p1,t3p1
# sticky策略下
c0: t0p0,t1p1,t3p0,t2p0
c2: t1p0,t2p1,t0p1,t3p1

3. Springboot整合Kafka进行消息收发

3.1. Producer发送端

首先引入POM依赖

 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>

配置producer的yaml

spring:
 kafka:
 bootstrap-servers: 39.99.222.44:9092
 producer:
 retries: 3 #发送消息的重试次数
 batch-size: 16384
 acks: 1 #等待partition leader落盘完成
 buffer-memory: 33554432 #设置生产者内存缓存的大小
 key-serializer: org.apache.kafka.common.serialization.StringSerializer
 value-serializer: org.apache.kafka.common.serialization.StringSerializer

发送代码编写

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
​
import javax.annotation.Resource;
​
@Component
public class KafkaProducerService {
​
 @Resource
 private KafkaTemplate<String,Object> kafkaTemplate;
​
 public void sendMessage(String topic,String key,Object object){
 ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,object);
 future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
 @Override
 public void onFailure(Throwable throwable) {
 System.out.println("********消息发送成功:"+throwable.toString());
​
 }
 @Override
 public void onSuccess(SendResult<String, Object> result) {
 System.out.println("=========消息发送成功:"+result.toString());
 }
 });
 }
}

调用测试

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.icodingedu.producter.service.KafkaProducerService;
​
@SpringBootTest
class KafkaProducterApplicationTests {
​
 @Autowired
 KafkaProducerService kafkaProducerService;
​
 @Test
 void contextLoads() {
 String topic = "topicfirst";
 for(int i=0;i<10;i++){
 kafkaProducerService.sendMessage(topic,"key:"+i,"hello kafka "+i);
 }
 System.out.println("======发送完成======");
 }
}

在没有设置分区和key的情况下,按照轮询方式写入数据,消费结果如下

# 读取的值
hello kafka 1
hello kafka 6
hello kafka 2
hello kafka 7
hello kafka 3
hello kafka 8
hello kafka 4
hello kafka 9
hello kafka 0
hello kafka 5
Partition: 0    1    2    3    4 
 1 6  2 7  3 8  4 9  0 5

不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!

相关文章

网友评论

      本文标题:kafka kafka消费者

      本文链接:https://www.haomeiwen.com/subject/ojrwpktx.html