美文网首页消息中间件程序员RabbitMQ
RabbitMQ消息过滤的一个思路

RabbitMQ消息过滤的一个思路

作者: shuangji | 来源:发表于2019-03-26 17:59 被阅读33次

需求

生产者 Producer 向一个队列发送消息,并且为消息打上不同的 Tag。假设这个队列有3个消费者(Consumer #[1:3]),Consumer #1 只想消费 tag1 标记的消息,Consumer #2 只想消费 tag2 标记的消息,Consumer #3 只想消费 tag3 标记的消息。

实现分析

Producer 消息打 Tag

生产者 publish 消息时,将 Tag 保存在 Map<String, Object> 类型的 header 字段,作为构建 AMQP.BasicProperties 参数

Consumer 指定消费 Tag 类型

消费者如何告知 Broker 只消费特定 Tag?

假设 Consumer #1 只希望消费带 tag1 标记的消息,那么 Consumer #1 可以在向 Broker 请求 Basic.Consume 指令时,捎带自己期望的 Tag 字符串。Client 在具体生成 consumerTag 时可以用 Tag 关键字加上随机字符串(避免 consumerTag 重复):

String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

服务端如何过滤

消费者通过 Basic.Consume 指令来监听队列的消息,这些消费者信息服务端是如何存储的?

保存在队列主进程(Pid)的 state 中(具体调试可以通过 sys:get_state(Pid)

%% Queue's state
-record(q, {
            %% 省略...
            %% consumers state, see rabbit_queue_consumers
            consumers,
            %% 省略...
           }).

并且队列进程在初始化时,会进行 consumers 初始化:

consumers = rabbit_queue_consumers:new(),

consumers 字段实际由 priority_queue:new()初始化。当有新的 consumer 注册到队列进程,那么会调用 rabbit_queue_consumers 模块的 add_consumer 方法来向 priority_queue 添加一个元素;同理当有 consumer下线时,最终也会调用该模块的 remove_consumer 方法。priority_queue 完整实现见 附二

具体过程

Broker 向 Consumer 投递消息时,底层是通过 rabbit_amqqueue_process 调用 rabbit_queue_consumers 模块的 deliver 方法。默认采用

deliver(FetchFun, QName, ConsumersChanged,
        State = #state{consumers = Consumers}) ->
    case priority_queue:out_p(Consumers) of
        {empty, _} ->
            {undelivered, ConsumersChanged,
             State#state{use = update_use(State#state.use, inactive)}};
        {{value, QEntry, Priority}, Tail} ->
            case deliver_to_consumer(FetchFun, QEntry, QName) of
                {delivered, R} ->
                    {delivered, ConsumersChanged, R,
                     State#state{consumers = priority_queue:in(QEntry, Priority,
                                                               Tail)}};
                undelivered ->
                    deliver(FetchFun, QName, true,
                            State#state{consumers = Tail})
            end
    end.

从 priority_queue 从获取一个 QEntry({ChPid, Consumer}),然后通过 FetchFun 从队列中获取消息,发送到 Channel 进程(ChPid)

deliver_to_consumer(FetchFun,
                    #consumer{tag          = CTag,
                              ack_required = AckRequired},
                    C = #cr{ch_pid               = ChPid,
                            acktags              = ChAckTags,
                            unsent_message_count = Count},
                    QName) ->
    {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
    rabbit_channel:deliver(ChPid, CTag, AckRequired,
                           {QName, self(), AckTag, IsDelivered, Message}),
    ChAckTags1 = case AckRequired of
                     true  -> queue:in({AckTag, CTag}, ChAckTags);
                     false -> ChAckTags
                 end,
    update_ch_record(C#cr{acktags              = ChAckTags1,
                          unsent_message_count = Count + 1}),
    R.

改造思路

在 consumers 不为空的情况下,通过 FetchFun 获取消息,此时可以获取该消息的 header,取出 Tag 值(如果消息打了 Tag 标记),然后通过 priority_queue 的 filter/2 方法

filter(Pred, Q) -> fold(fun(V, P, Acc) ->
                                case Pred(V) of
                                    true  -> in(V, P, Acc);
                                    false -> Acc
                                end
                        end, new(), Q).

Pred 实现中,我们可以判断当前消息 Tag 值是否被包含在 consumerTag 中,从而可以过滤出消费特定 tag 的consumers,最后向这些 consumers 中的一个发送 Message 消息。

附一(队列进程 state 中的 consumers 信息例子)

{state,
           {queue,
               [{<7874.10509.1537>,
                 {consumer,<<"amq.ctag-VGcFg-OvjkY9xv6jqjXySQ">>,true,10,[]}},
                {<7874.3960.1541>,
                 {consumer,<<"amq.ctag-W52EK0jEsS51bnMdRuNivg">>,true,10,[]}},
                {<7874.859.1523>,
                 {consumer,<<"amq.ctag-NWQwVx3nkUfXmebmmOYzpQ">>,true,10,[]}},
                {<7874.32052.1540>,
                 {consumer,<<"amq.ctag-vE7sryPzzJGvwxMFTsulag">>,true,10,
                     []}}],
               [{<7874.4076.1541>,
                 {consumer,<<"amq.ctag-IFb6d2EnIUU7dFgqEqJs2g">>,true,10,[]}},
                {<7891.3104.1332>,
                 {consumer,<<"amq.ctag-jakQZTbwWx3DZQracuuuOQ">>,true,10,[]}},
                {<7891.3047.1332>,
                 {consumer,<<"amq.ctag-j-KUXd4rd_qfSE9h7N_rqQ">>,true,10,[]}},
                {<7891.27946.1203>,
                 {consumer,<<"amq.ctag-zdc87xrG3f7Y3EFHjLTBhw">>,true,10,[]}},
                {<7891.29647.1475>,
                 {consumer,<<"amq.ctag-f3h4rCE6prP9PbkL68rhsg">>,true,10,[]}},
                {<7891.2982.1332>,
                 {consumer,<<"amq.ctag-j6McrXJC1KsLlx0XVjonoA">>,true,10,
                     []}}],
               10},
           {active,-573798462590564,0.009264803170815055}},

附二(priority_queue 模块实现
rabbit_common

:上述思路建议在测试环境测试,考虑到有可能出现的性能问题,作为一个调研也会有很多工作要做,整个过程会涉及 RabbitMQ 服务端源码改造、编译、打包(rabbitmq-public-umbrella )以及客户端的相关改造,如果能实际尝试下,也会有不小的收获。

相关文章

  • RabbitMQ消息过滤的一个思路

    需求 生产者 Producer 向一个队列发送消息,并且为消息打上不同的 Tag。假设这个队列有3个消费者(Con...

  • rabbitmq的队列消息累积监控总结

    目标:对rabbitmq的队列进行消息累积监控。整体思路: 根据rabbitmq提供的API,获取要监控的队列信息...

  • springboot常用starter⑫-rabbitmq

    rabbitmq RabbitMQ 是部署最广泛的开源消息代理RabbitMQ 是一个消息代理:它接受和转发消息您...

  • 2019-07-26

    RabbitMQ RabbitMQ是什么 RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮...

  • RabbitMQ-核心介绍

    RabbitMQ介绍 一、RabbitMQ使用场景 RabbitMQ他是一个消息中间件,说道消息中间件【最主要的作...

  • RabbitMQ介绍

    RabbitMQ RabbitMQ简介 RabbitMQ是一个消息代理:它接受和转发消息。存储和转发二进制数据块 ...

  • RabbitMQ学习笔记(一)

    What is RabbitMQ#### RabbitMQ是一个消息代理、一个消息系统的媒介。他可以提供一个通用的...

  • Redis作为消息队列与RabbitMQ的比较

    Redis作为消息队列与RabbitMQ的比较 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议...

  • springboot2.0集成rabbitmq

    安装rabbitmq 简介: rabbitmq即一个消息队列,主要用来实现应用程序的异步和解耦,消息缓冲,消息分发...

  • 【译】RabbitMQ教程一

    内容来自:RabbitMQ Tutorials Java版 介绍 RabbitMQ是一个消息代理:它接受并转发消息...

网友评论

    本文标题:RabbitMQ消息过滤的一个思路

    本文链接:https://www.haomeiwen.com/subject/cxdnvqtx.html