美文网首页RabbitMQ消息中间件
RabbitMQ队列有Unacked消息,但无消费者信息

RabbitMQ队列有Unacked消息,但无消费者信息

作者: shuangji | 来源:发表于2019-02-26 19:20 被阅读13次

问题

业务反馈线上队列无消费者,但是management管理端发现有Unacked量


基本原理

1、Unacked消息指的是服务端已经投递给消费者,但还没有收到消费者响应Ack这么一个中间状态。
2、消费者如果在消费过程中(尚未给Broker响应Ack)退出,那么消费中的这部分Unacked消息会被requeue到队列,进而投递到其他消费者(如果有Consumers在线);如果没有Consumers,那这部分Unacked消息会转换为Ready状态。
3、Unacked消息状态被Broker维护在内存中。

而上面截图的问题:队列有Unacked量,但是没有Consumers信息,似乎很矛盾。

定位

阅读服务端消费相关模块代码,发现 rabbit_queue_consumers 模块:

计算Unacked消息量

unacknowledged_message_count() ->
    lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).

读进程字典

all_ch_record() -> [C || {{ch, _}, C} <- get()].

写进程字典

store_ch_record(C = #cr{ch_pid = ChPid}) ->
    put({ch, ChPid}, C),
    ok.

rabbit_queue_consumers 模块实际是被 rabbit_amqqueue_process 调用。不难发现队列进程进程字典维护消息消费过程中的消费Channel进程等信息。

验证

1、启动Erlang shell

erl -name sj@test -setcookie   cookie名

2、获取队列amqqueue记录信息,其中包含进程pid等信息。

rpc:call(N,erlang,apply,[fun()->rabbit_amqqueue:lookup(rabbit_misc:r(<<"vhost名字">>,queue,<<"queue名字">>)) end,[]]).

3、获取队列进程进程字典信息。Pid为步骤2获取到的进程Pid

rpc:call(node(Pid), erlang, process_info, [Pid,dictionary]).

最终发现了类似如下6条关键信息,与问题中的Unacked量对应:

{{ch,<8232.16445.2870>},
               {cr,<8232.16445.2870>,#Ref<8251.0.972816396.42542>,
                   {[{3,<<"amq.ctag-RkzjrIA60GwE9ED8opKK7w">>}],[]},
                   0,
                   {queue,[],[],0},
                   {qstate,<8232.16442.2870>,dormant,{0,nil}},
                   1}}

解决

已经能找到上述Unacked状态的6条消息对应的Channel进程,如何让这些消息重入队(成为Ready状态)?

阅读服务端代码,发现 rabbit_amqqueue_process 模块 handle_ch_down/2 方法,该方法内部调用 rabbit_queue_consumers 模块 erase_ch/2 方法,
最终调用 erase/2 将 {ch, ChPid} 这个key从进程字典中移除。

因此,可以通过触发channel进程DOWN

exit(ChPid,normal).

最终队列进程感知Channel进程退出,进而将Unacked状态requeue,最终消息转变为Ready状态~

拓展

如何获取队列Unacked状态消息?

sys:get_state(Pid).

有兴趣可以调试下

相关文章

网友评论

    本文标题:RabbitMQ队列有Unacked消息,但无消费者信息

    本文链接:https://www.haomeiwen.com/subject/pfyzyqtx.html