背景
生产中有这么一种场景:业务方会问某一队列当前有哪些应用在发送或消费
方式1:人工通过RMQ插件Management管理端,只能看Channels、Consumers信息,很难从中大量的信息中提取某一队列对应的生产者和消费者相关信息
方式2:通过监控比如Cat看当前该队列MQ相关打点,可行但很容易漏掉。许多Task类型的应用不是时刻都有MQ打点
以上两种方式均很难满足业务要求。
思路
Producer,Consumer 通过 Channel 来和 Broker通信,而Channel则是复用Socket连接。
在Broker实现中,Channel进程(发送或者消费)都会监视(monitor
)队列进程,如下图:
通过rabbitmq_top插件相关模块获取Channel进程信息类似如下:
因此,如果我们能够获取该队列对应的发送和消费Channels进程信息,就可以提取出发送/消费应用的IP。
步骤
1、通过获取队列进程 Pid 的monitored_by
信息,并利用rabbitmq_top插件相关模块从中提取出类型为rabbit_channel的进程 Chs:
{monitored_by,Res} = rpc:call(node(Pid), erlang, process_info, [Pid, monitored_by]),
Chs = lists:foldl(fun(E,Acc) -> case lists:keyfind(type,1,rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of
{type,rabbit_channel} -> [E|Acc];
_ -> Acc end end, [], Res).
2、通过rabbitmq_management插件相关模块方法获取队列进程对应的消费者Channel进程 ConsumerChs:
Consumers4VHost = rpc:call(Node, erlang, apply, [fun() -> rabbit_mgmt_db:get_all_consumers(list_to_binary(Vhost)) end,[]]),
QName = Q#amqqueue.name#resource.name,
Consumers4Q = lists:filter(fun(T) -> case lists:keyfind(queue, 1, T) of
{queue,QRes} -> case lists:keyfind(name, 1, QRes) of
{name,QName} -> true;
_ -> false end;
_ -> false end end, Consumers4VHost),
ConsumerChs = lists:foldl(fun(E,Acc) -> case lists:keyfind(channel_pid, 1, E) of
{channel_pid,Ch} -> [Ch|Acc];
_ -> Acc end end, [], Consumers4Q).
3、生产者Channel进程 ProducerChs:
ProducerChs = Chs -- ConsumerChs.
4、这样我们获取了队列进程对应的生产者和消费者Channel进程列表,接下来从Channel进程信息中提取客户端IP就可以了:
lists:foldl(fun(E,Acc) -> case lists:keyfind(connection_name, 1, rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of
{connection_name,Conn} ->
ConnStr = binary_to_list(Conn),
Ip = string:substr(ConnStr, 1, string:chr(ConnStr, $:) - 1),
[Ip|Acc];
_ -> Acc end end, [], ChsList).
最后通过去重上面的IP列表集合,得到该队列对应的生产者和消费者IP信息,通过调用相关接口反查IP对应的应用名即可。
相关拓展:
1、如何获取队列进程Pid?通过获取队列amqquque
记录:
QRes = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
[Queue] = rpc:call(Node, rabbit_amqqueue, lookup, [[QRes]]).
2、如何调试上述功能?
利用Erlang shell,确保集群节点已启用management、top插件
3、生产环境实践:
Escript方式运行脚本;比较好的方法是以Erlang Web Server的方式运行,并提供HTTP接口
4、其他场景
僵死队列问题、队列进程(热迁移)负载均衡、获取队列进程、delegate进程关键指标(如邮箱大小、占用内存)等,都可以利用Broker/Plugin既有模块代码进行解决,前提需要熟悉服务端相关模块代码,多进行调试。
网友评论