首先介绍一下kafka相关的名词:
- broker: 一般一个kafka集群有多个broker节点, broker集群组成了kafka集群
- topic: 逻辑上存在的概念, 数据投递到某个topic
- partition: 一个topic的数据由一个或多个partition保存, 每个partition是一个有序的队列, 同时每个partition都有备份的partition, 一组partition中会选举出一个leader partition用来做读写的操作, 这个leader partition用来跟consumer 和 provider进行数据交互, 这也是leader-follower模式的好处, 能够保证数据的有序性. follower partition会去leader partition中fetch数据. 如果一旦leader partition出现故障, follower partition中还有备份数据, 可以从follower partition中再选举出一个leader partition继续提供服务.当然这里还有一个问题就是, 如果leader partition发生故障以后, 其他follower partition没能fetch到最新的数据, 有一部分数据丢失的话这怎么办? 我们先把问题抛出来, 后面在介绍.
- segment: partition有多个segment组成
- offset: 消息消费进度的偏移值
- consumer: 消息消费者
- consumer group: 消息消费者组, 一条消息可以被多个consumer group消费, 一个consumer group只能有一个consumer消费消息
Kafka 消息机制
Kafka producer 发消息
要保证不丢消息先从消息的源头开始讲起. Producer发送消息有三中方式: Sync(同步), Async(异步), OneWay.通过producer.type来配置, 默认是值sync.
同步对比异步
同步
一条消息发送到broker需要保证所有的相关分区能够收到数据, 能够保证不丢消息, 但是吞吐率会下降.
异步
异步发送到一个缓冲区, 缓存区的数据没写到Broker中就返回给client了.
异步发送的方式可以批量(batch)发送, 这样可以提高broker的性能和吞吐量, 减少网络IO和磁盘请求次数,但是可能会有丢数据的风险.
异步模式有几个参数可以设置:
属性 | 默认值 | 说明 |
---|---|---|
queue.buffering.max.ms | 5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages | 10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | -1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages | 200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量) |
OneWay是异步方式的一种, 他不接收Broker返回的ack值, 只管发, 不管异常.
配置参考:
-
同步:
producer.type=sync request.required.acks=1
-
异步
producer.type=async request.required.acks=1 queue.buffering.max.ms=5000 queue.buffering.max.messages=10000 queue.enqueue.timeout.ms = -1 batch.num.messages=200
-
OneWay
producer.type=async request.required.acks=0
request.required.acks
我们先来看看broker收消息的流程:
- Producer往topic中发消息的时候首先通过zk找到leader partition, 然后往leader partition中发消息.
- leader partition收到消息以后会先将消息写到log文件中, 数据还保存在内存中
- follower 到leader中pull消息后也将数据写到其本地的log文件中, 这个时候数据也只保存在follower的内存中, 还没写到磁盘. 为了提高性能,立刻向leader发送一个ack. leader收到所有follower的ack消息, 则认为这条消息已经commit了, broker将会向producer发送一个ack消息.
上面的三个流程对应三个不同的ack时间节点, producer的ack值有三种配置方式, 我们可以通过request.required.acks
属性来配置:
- 0: producer不等待broker返回确认消息, 这样可以得到最大的吞吐率, 但是可能丢消息
- 1: 等待leader partition保存成功状态返回, 有不错的可靠性
- -1: 等待follower partition都收到数据, 这种方法看起来最可靠, 理论上不会丢消息
除了通过选择Producer发消息的方式以为,Kafka内部也会通过一些机制来保障不丢消息.比如,leader partition还会跟踪follower, 如果一个follower宕机或是落后太多, 将会被移除follower列表.落后太多的定义有两种, 一种是follower复制的消息落后于leader预设的阈值,在server.properties配置中的replica.lag.max.messages
来配置, 另一种是超过一定时间, leader没有收到follower来pull消息的请求, 也可以通过replica.lag.time.max.ms
来配置.
Kafka consumer 消费消息
consumer
Kafka的consumer和partition存在这么一种关系, consumer数量不应该多于partition的设置, 因为多出来的consumer并不能消费partition的消息, consumer也会固定消费某个或是某几个partition的消息, 除非触发rebalance后可能会导致consumer消费的partition发生变化. 基于这样的规则, 我们在配置partition的时候尽可能的将partition设置的大一些, 这样方便我们做consumer的水平扩展.某种程度上来说, partition关系着整个Kafka集群的吞吐率.
offset
Kafka offset记录的是消息的消费进度, Kafka的offset有两种保存方式, 一种是通过配置参数: zookeeper.connect
, 这种情况下, 消费进度会保存到zookeeper下的consumers/{group}/offsets/{topic}/{partition}
目录下. 另一种是通过配置参数: bootstrap.servers
, 这是通过kafka默认api的消费方式, offset会保存在kafka的一个默认topic__consumer_offsets
. 查看当前group的消费进度, 要依靠kafka自带的工具【kafka-consumer-offset-checker】.
提交方式
消息commit的方式有两种, 一种是自动提交, 一种是手动提交.
自动提交带来的问题
自动提交规则是, 消费者会每隔一定的时间间隔来自动提交一次该消费者进程的消费的所有partition的offset(由auto.commit.interval.ms
指定).需要注意的是这种方式可能导致重复消费和丢消息的问题.
重复消费: 当consumer poll了消息, 并消费后, 没有到自动提交offset的时机,这时触发了rebalance, rebalance后这个consumer依然消费原来的partition, 将从上一个offset消费消息, 这个时候就出现了重复消费.
丢消息: 当consumer poll消息后, 业务上并没有处理完, 但是触发了commit的时机, 提交了offset, 恰巧的是出现了crash. 当rebalance以后, 这条消息就被丢掉了, 没能成功消费.
所以, 最好还是采用手动提交的方式来避免rebalance带来的重复消费和丢消息的问题.
手动提交带来的问题
刚刚分析了自动提交的问题, 手动提交也不是没有问题, 手动提交也可能导致一些异常.比如说, 一个consumer进程有两个线程t1,t2 分别消费两个partition分区p1, p2. 当t1消费完消息以后手动commit, 这个时候会提交consumer所有partition的offset, 也就是t2的offset也提交了, 但实际上t2这是可能只是poll了消息并没有消费完成, 如果consumer进程这个时候发生crash, 那么t2消费的消息也会丢到. 目前的kafka控制offset粒度还是一个进程粒度.
那咋整呢?如果consumer数量和partition数量能够一致, 那就不会有这个问题, 说白了就不会有一个consumer消费多个partition的情况. 或者, 我们可以poll消息以后放到一个队列中, 将队列中的所有消息消费掉以后提交一个批量的offset.
网友评论