需求
生产者 Producer 向一个
队列发送消息,并且为消息打上不同的 Tag。假设这个队列有3
个消费者(Consumer #[1:3]),Consumer #1 只想消费 tag1
标记的消息,Consumer #2 只想消费 tag2
标记的消息,Consumer #3 只想消费 tag3
标记的消息。
实现分析
Producer 消息打 Tag
生产者 publish 消息时,将 Tag 保存在 Map<String, Object>
类型的 header 字段,作为构建 AMQP.BasicProperties
参数
Consumer 指定消费 Tag 类型
消费者如何告知 Broker 只消费特定 Tag?
假设 Consumer #1 只希望消费带 tag1
标记的消息,那么 Consumer #1 可以在向 Broker 请求 Basic.Consume
指令时,捎带自己期望的 Tag 字符串。Client 在具体生成 consumerTag 时可以用 Tag 关键字加上随机字符串(避免 consumerTag 重复):
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
服务端如何过滤
消费者通过 Basic.Consume 指令来监听队列的消息,这些消费者信息服务端是如何存储的?
保存在队列主进程(Pid)的 state 中(具体调试可以通过 sys:get_state(Pid)
)
%% Queue's state
-record(q, {
%% 省略...
%% consumers state, see rabbit_queue_consumers
consumers,
%% 省略...
}).
并且队列进程在初始化时,会进行 consumers 初始化:
consumers = rabbit_queue_consumers:new(),
consumers 字段实际由 priority_queue:new()
初始化。当有新的 consumer 注册到队列进程,那么会调用 rabbit_queue_consumers 模块的 add_consumer 方法来向 priority_queue 添加一个元素;同理当有 consumer下线时,最终也会调用该模块的 remove_consumer 方法。priority_queue
完整实现见 附二
具体过程
Broker 向 Consumer 投递消息时,底层是通过 rabbit_amqqueue_process 调用 rabbit_queue_consumers 模块的 deliver 方法。默认采用
deliver(FetchFun, QName, ConsumersChanged,
State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged,
State#state{use = update_use(State#state.use, inactive)}};
{{value, QEntry, Priority}, Tail} ->
case deliver_to_consumer(FetchFun, QEntry, QName) of
{delivered, R} ->
{delivered, ConsumersChanged, R,
State#state{consumers = priority_queue:in(QEntry, Priority,
Tail)}};
undelivered ->
deliver(FetchFun, QName, true,
State#state{consumers = Tail})
end
end.
从 priority_queue 从获取一个 QEntry({ChPid, Consumer}
),然后通过 FetchFun
从队列中获取消息,发送到 Channel 进程(ChPid)
deliver_to_consumer(FetchFun,
#consumer{tag = CTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
QName) ->
{{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
true -> queue:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
R.
改造思路
在 consumers 不为空的情况下,通过 FetchFun
获取消息,此时可以获取该消息的 header,取出 Tag 值(如果消息打了 Tag 标记),然后通过 priority_queue 的 filter/2 方法
filter(Pred, Q) -> fold(fun(V, P, Acc) ->
case Pred(V) of
true -> in(V, P, Acc);
false -> Acc
end
end, new(), Q).
在 Pred
实现中,我们可以判断当前消息 Tag 值是否被包含在 consumerTag 中,从而可以过滤出消费特定 tag 的consumers,最后向这些 consumers 中的一个发送 Message
消息。
附一
(队列进程 state 中的 consumers 信息例子)
{state,
{queue,
[{<7874.10509.1537>,
{consumer,<<"amq.ctag-VGcFg-OvjkY9xv6jqjXySQ">>,true,10,[]}},
{<7874.3960.1541>,
{consumer,<<"amq.ctag-W52EK0jEsS51bnMdRuNivg">>,true,10,[]}},
{<7874.859.1523>,
{consumer,<<"amq.ctag-NWQwVx3nkUfXmebmmOYzpQ">>,true,10,[]}},
{<7874.32052.1540>,
{consumer,<<"amq.ctag-vE7sryPzzJGvwxMFTsulag">>,true,10,
[]}}],
[{<7874.4076.1541>,
{consumer,<<"amq.ctag-IFb6d2EnIUU7dFgqEqJs2g">>,true,10,[]}},
{<7891.3104.1332>,
{consumer,<<"amq.ctag-jakQZTbwWx3DZQracuuuOQ">>,true,10,[]}},
{<7891.3047.1332>,
{consumer,<<"amq.ctag-j-KUXd4rd_qfSE9h7N_rqQ">>,true,10,[]}},
{<7891.27946.1203>,
{consumer,<<"amq.ctag-zdc87xrG3f7Y3EFHjLTBhw">>,true,10,[]}},
{<7891.29647.1475>,
{consumer,<<"amq.ctag-f3h4rCE6prP9PbkL68rhsg">>,true,10,[]}},
{<7891.2982.1332>,
{consumer,<<"amq.ctag-j6McrXJC1KsLlx0XVjonoA">>,true,10,
[]}}],
10},
{active,-573798462590564,0.009264803170815055}},
附二
(priority_queue 模块实现
rabbit_common)
注
:上述思路建议在测试环境测试,考虑到有可能出现的性能问题,作为一个调研也会有很多工作要做,整个过程会涉及 RabbitMQ 服务端源码改造、编译、打包(rabbitmq-public-umbrella )以及客户端的相关改造,如果能实际尝试下,也会有不小的收获。
网友评论