在实现事件流流经Kafka时遇到了这个问题,即如何满足消息按produce顺序去consume。
概念&问题
首先,了解Kafka中broker、partition、topic的概念。
- Topic:特指Kafka处理的消息源的不同分类。可理解为字面意“主题”。
- Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的offset。
- Broker:直译“经纪人”,Kafka集群包含一台或多台服务器,服务器则称为broker。
针对最小单位partition,其内部可以保证顺序,但跨partition不保证顺序。也就是说,对某一topic如果增加partition可以增加吞吐能力,但无法保证topic级别的消息的有序性。
好了,那就在broker上新建一个topic: test,producer规规矩矩顺序生产,consumer新起线程消费...但是结果却不如我之前期望的:
Record Key:null, value:SmartHomeRawEvent{id=19, timestamp=1377986492, value=68.451, property=0, plugId=2, householdId=1, houseId=0}, partition:0, offset:28
Record Key:null, value:SmartHomeRawEvent{id=20, timestamp=1377986493, value=33.291, property=1, plugId=2, householdId=1, houseId=0}, partition:0, offset:29
Record Key:null, value:SmartHomeRawEvent{id=1, timestamp=1377986401, value=68.451, property=0, plugId=11, householdId=0, houseId=0}, partition:0, offset:30
Record Key:null, value:SmartHomeRawEvent{id=2, timestamp=1377986402, value=19.927, property=1, plugId=11, householdId=0, houseId=0}, partition:0, offset:31
以上是输出结果节选,可以看出{id=19}'offset=28 > {id=1}'offset=30,这是怎么回事?
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
稍加Google发现另一个Producer的关键参数:max.in.flight.requests.per.connection
,Kafka-2.0.0源码对此参数解释为:
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
这参数我理解是控制同时最大请求数(unack),在>1时且开启了retry,遇到失败请求时,就有可能造成消息重排序。试着设成1,这次消费顺序正常了。
Idempotent Producer
事情还没完,Stack Overflow上对max.in.flight.requests.per.connection
讨论中有人提出开启幂等生产者(Idempotent Producer),即enable.idempotence=true
代替max.in.flight.requests.per.connection=1
,可以解决ordering并保证Exactly Once语义,同时由于最大请求数在最新kafka版本中兼容<=5,还能保证更好的性能。
于是对"幂等"又做了进一步调查,幂等就是指重复操作造成的影响和一次操作一样。这里Idempotent Producer特指重复produce message时只会有唯一一个有效的message,从而保证了Exactly Once而不是At Least Once。注意开启幂等可能会覆盖retry/ack配置;幂等性同样只作用于同一partition。
参数解释如下:
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.
Note that enabling idempotence requires MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and ACKS_CONFIG must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.
看图也很容易理解加幂等的原理,增加了一个对用户不可见的PID,broker只会接收同一个PID&Partition下Seq+1的请求。
未开启幂等
开启幂等
实测了几次确实没有出现消费乱序,不失为较好的保证顺序的方法。实验规模较小,如有错误还请指正。
Ref:
https://hevodata.com/blog/kafka-exactly-once/
https://stackoverflow.com/questions/49802686/understanding-the-max-inflight-property-of-kafka-producer
网友评论