问题
业务反馈线上队列无消费者,但是management管理端发现有Unacked量
![](https://img.haomeiwen.com/i16317810/fd9113360936bb96.png)
基本原理
1、Unacked消息指的是服务端已经投递给消费者,但还没有收到消费者响应Ack这么一个中间状态。
2、消费者如果在消费过程中(尚未给Broker响应Ack)退出,那么消费中的这部分Unacked消息会被requeue到队列,进而投递到其他消费者(如果有Consumers在线);如果没有Consumers,那这部分Unacked消息会转换为Ready状态。
3、Unacked消息状态被Broker维护在内存中。
而上面截图的问题:队列有Unacked量,但是没有Consumers信息,似乎很矛盾。
定位
阅读服务端消费相关模块代码,发现 rabbit_queue_consumers 模块:
计算Unacked消息量
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
读进程字典
all_ch_record() -> [C || {{ch, _}, C} <- get()].
写进程字典
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
rabbit_queue_consumers 模块实际是被 rabbit_amqqueue_process 调用。不难发现队列进程的进程字典维护消息消费过程中
的消费Channel进程等信息。
验证
1、启动Erlang shell
erl -name sj@test -setcookie cookie名
2、获取队列amqqueue记录信息,其中包含进程pid等信息。
rpc:call(N,erlang,apply,[fun()->rabbit_amqqueue:lookup(rabbit_misc:r(<<"vhost名字">>,queue,<<"queue名字">>)) end,[]]).
3、获取队列进程的进程字典信息。Pid为步骤2获取到的进程Pid
rpc:call(node(Pid), erlang, process_info, [Pid,dictionary]).
最终发现了类似如下6条关键信息,与问题中的Unacked量对应:
{{ch,<8232.16445.2870>},
{cr,<8232.16445.2870>,#Ref<8251.0.972816396.42542>,
{[{3,<<"amq.ctag-RkzjrIA60GwE9ED8opKK7w">>}],[]},
0,
{queue,[],[],0},
{qstate,<8232.16442.2870>,dormant,{0,nil}},
1}}
解决
已经能找到上述Unacked状态的6条消息对应的Channel进程,如何让这些消息重入队(成为Ready状态)?
阅读服务端代码,发现 rabbit_amqqueue_process 模块 handle_ch_down/2 方法,该方法内部调用 rabbit_queue_consumers 模块 erase_ch/2 方法,
最终调用 erase/2 将 {ch, ChPid} 这个key从进程字典中移除。
因此,可以通过触发channel进程DOWN
exit(ChPid,normal).
最终队列进程感知Channel进程退出,进而将Unacked状态requeue,最终消息转变为Ready状态~
拓展
如何获取队列Unacked状态消息?
sys:get_state(Pid).
有兴趣可以调试下
网友评论