kafka消费组
0.8版本后offsize存储到了__consumer_offsets队列里,这个队列有50个分区。先看下offsize在__consumer_offsets的存储情况。
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 49 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter"
1、localhost要替换成服务器的ip
2、partition位置的计算方法是:Math.abs(消费组groupID.hashCode()) % 50
指定的消费组,在****__consumer_offsets存储情况****
1.能看出结构是个map,key是topic+partition,value是offsets
2.有过期时间,会有定时定位去压缩清理__consumer_offsets队列,使保持最近的状态
消费组Rebalance
什么情况下会出发rebalance?
1.topic的partition数发生变化(broker宕机或者增加partition)
2.当增加或减少consumer时
比如3个broker,topic有3个partition,有一个consumer group里有3个consumer去消费这个topic。如图所示,此时会分配每个consumer去消费一个partition。当consumer减少为1个时,就会触发rebalance,去消费3个分区。
****rebalance的设计过程是怎样的呢?参考博文****
1。kafka设计里有一个coordinator的协调角色负责rebalance
coordinator是服务端角色程序
每个consumer group对应一个coordinator实例。
coordinator的所在broker的确定,之前说明Math.abs(消费组groupID.hashCode()) % 50能确认consumer group的offsets所在的分区,选这个分区的leader所在机器。
交互的过程简化版,举例子consumer增减的情况。
1.每个consumer定时发送心跳给coordinator,coordinator发现有变化。给所有的consumer发response,协议里说明要重新rebalance了,把自己的消费分区信息发给coordinator。
2.coordinator从所有的consumer里选出一个leader角色(目的是消费组的rebalance过程是设计发生在客户端去做的,这样设计是为了灵活),把所有的consumer消费分区信息发给leader。
3.这个consumer leader决定分配方案,发给coordinator
4.然后coordinator再把具体的分配发给对应的consumer,完成rebalance
请求协调者,协调者指定consumer leader
分配方案,下发方案
上述过程多了个consumer leader角色,使交互变得复杂些,目的就是想让rebalance的过程
交给客户端去做,增加灵活性。实现方式是需要覆盖consumer的参数:partition.assignment.strategy来实现自己分配策略就好了。应用场景还
比如可以为consumer挑选同一个机架下的分区数据,减少网络传输的开销。不去覆盖自定义策略的话,Kafka默认为你提供了两种分配策略:range和round-robin。
交互时序编码细节可以看下状态定义,基本跟订单状态类似,系统的实现必要的东西。
kafka消费语义
大致分析了上述流程后。可以思考下,kafka在哪些情况下,会重复消息和丢消息。
从producer broker consumer举几个例子
结论:
kafka在极端情况下会有重复消息和丢消息。
-
Kafka只是能保证at-least once消息语义,即数据是可能重复的,这个需要下游消费者做到幂等性和无状态。
-
除了参数设置和系统级别的极端情况,重复和丢消息很多是kafka consumer使用导致的,一般情况下推荐使用high-level API接口,最好不要直接使用low-level API,自己写起来比较麻烦和困难。
push or pull
producer端是push的方式
consumer端是考虑的长轮训pull的方式
消息队列的实现,要考虑是用推的方式还是拉的方式,推的方式
推的方式能满足实时性,但是如果下游的处理能力达不到的话,就会导致下游服务端异常,本地队列丢消息。
而拉的方式可以避免这个问题,但是缺点是不能达到实时性的需求。
为了保证拉实时性的问题,可以提高拉的频率。但是频率较高的话,对于那些队列里,可能一段时间一直没有消息的场景。又有资源浪费,会有很多无用的请求。
而降低频率,又要妥协实时性。
工程上的做法,是指数递增间隔时间,比如2s,4s,8s,16s...
有消息后,再重复从2s开始。这种做法,在极端情况下(比如1分钟点没消息,正好在1分钟1s来了,这个时候要在2分钟那个点才能拉到消息)。
基于以上分析,kafka采用的是,建立维护网络长连接的方式,长轮训的方式。在没有消息的时候阻塞长链接,建立监听,当有消息到来的时候,唤起长链接拉去消息。
网友评论