一个partition只会有队头的batch被发送,sender线程不会对发送中partition的其余batch检查过期,指向同一个broker的多个partition的batch能够合并成一个request发送。其中前两点是由Accumulator里的muted变量来保证,muted指“进行中”的意思。注意是只有muted的partition才不会对其余batch检查过期,在把batch组装成request即将发送时才会把partition添加进muted。如果是新的batch,因为网络抖动长时间(request.timeout.ms)不能组装成request,是会被sender线程检查过期并立即返回失败的callback,打印日志:
TimeoutException: Expiring x record(s) for xxxxxx: 30001 ms has passed since last append
或者
TimeoutException: Expiring x record(s) for xxxxxx: 30001 ms has passed since batch creation plus linger time
同理,发送中partition的batch因为网络抖动发送失败进行retry,实际上是把retry的batch添加到deque的头部等待下一次的request组装,这时已经从muted中退出来。如果因为网络抖动长时间(request.timeout.ms)不能组装成request,该retry的batch也会被sender线程检查过期并立即返回失败的callback,打印日志:
TimeoutException: Expiring x record(s) for xxxxxx: 30001 ms has passed since last attempt plus backoff time
所以说幂等producer保证的无限重试,是指batch在每次retry之后,都能组装成request发送给broker,然后得到broker的成功或失败response,或者因为超时未响应由producer自己给自己构建了一个失败的response,这样才会进行下一轮的retry。对于长时间的网络抖动,幂等producer也不会无限重试的。
如果max.in.flight不为1,sender线程内的guaranteeMessageOrder就为false,也就是说不再使用Accumulator里的muted变量做上述前两点的保护。
因为in.flight=1的关系,同时发送给同一个broker的只能有一个request,所以如果别的partition也有batch需要发往同一个broker,又没有赶上刚发出去的request,只能阻塞等待request返回之后的下一轮request。但sender线程会检查未在发送中的partition的batch,如果发送出去的request因为网络抖动进行retry,等待发送的partition的batch阻塞时间过长,就会被sender线程检查判定为超时未发送,从而立即返回失败的callback,打印日志:
TimeoutException: Expiring x record(s) for xxxxxx: 30001 ms has passed since batch creation plus linger time
有点不同的是,这是后面的batch被前面的request阻塞导致的失败callback。由于我们使用异步时一旦收到失败callbak就会立即producer.close(0)来中止任何后序消息的发送,所以这里由后面的batch的失败callback去关闭producer会导致前面retry的batch不能正确的返回callback,即使基本都会retry成功写入broker但还是会返回失败的callback:
IllegalStateException: Producer is closed forcefully.
由于业务代码根据回调结果来保存发送序号,导致retry的数据会被业务代码重发了一遍。
所以一个producer实例要尽量少的同时对多个partition进行发送,“同时发送”是指producer内部的发送队列里有多个partition的deque等待sender线程发送,一个是异步发送能做到,一个是同步发送时的多线程发送各个partition能做到,所以单线程挨个partition的同步发送是不用担心的。正常来说对一个topic进行异步发送或多线程同步发送是没什么问题的。
同时经过kafka社区大神的认同,上面提到的request发送失败进行retry,很大的原因就是同一个request发送了多个partition的batch。虽然指向同一broker的多个partition的batch可以合并以提高并发效率,但每个batch达到broker之后要被各自不同的follower复制,全部都完成之后才会一起返回一个response。很明显总耗时很容易拉长导致producer超时retry。
producer的异步发送也能做到不重复不丢失。因为sender线程不会对发送中partition的其余batch检查过期,只要不用同一producer异步发送多个partition,就能避免异步发送会被任何后序batch超时异常扰乱的可能。
发送batch前producer会按顺序保存每条消息的callback并在response后按顺序调用,所以partition同时只有一个batch被发送,且发送成功或失败后按batch内消息的严格顺序进行callback回调,全部回调结束后sender线程才会进行下一轮的request组装和batch发送。
业务代码使用producer.ayncSend把消息托管给producer之后就对后序的流程透明,callback是业务代码唯一机会去掌握消息发送的控制,只要在失败的callback里producer.close(0)就能中止下一轮的batch发送,相当于在失败处断开,以便producer重启后从失败处继续执行。所以不用担心异步发送会丢失对发送的控制,认为失败回调只是一次普通的通知。
网友评论