美文网首页RocketMQ系列
RocketMQ系列(四):consumer

RocketMQ系列(四):consumer

作者: 范柏柏 | 来源:发表于2020-06-06 19:59 被阅读0次

    两种模式

    • 集群模式
    • 广播模式
      集群模式:topic下的同一条消息只允许被同一个group下的其中一个消费者消费
      广播模式:topic下的同一条消息被集群内所有消费者消费

    推还是拉

    rocketMq可以说是推的。但这个推其实是对拉模式的一种封装。
    broker和consumer保持长连接。consumer发送拉取请求。拉取请求的触发条件:

    1. broker有消息进来的时候,会通知consumer,让consumer来拉
    2. consumer在拉完一次后,会继续发出拉取动作,拉完再拉,拉完再拉

    PullRequest

    consumer发送拉取请求。请求体如下


    PullRequest.png

    consumerGroup: 消费者组
    messageQueue: 待拉取消费队列
    processQueue: 消息处理队列。从broker拉取到的消息,先存入processQueue,然后再提交到消费者消息线程池消费。
    nextOffset: 待拉取的messageQueue偏移量
    lockedFirst: 是否被锁定

    请求体中有待拉取消费队列,consumer怎么知道的从哪个messageQueue拉取?

    rocketMq底层,消息指定分配给消费者的实现,是通过把queue队列分配给消费者的方式完成的。

    将queue队列指定给特定的consumer后,该queue中的所有消息,都由该consumer进行消费。

    怎么把queue分配给consumer的呢,当然也是有策略的

    /**
     * 为消费者分配queue的策略算法接口
     */
    public interface AllocateMessageQueueStrategy {
    
        /**
         * Allocating by consumer id
         *
         * @param consumerGroup 当前 consumer群组
         * @param currentCID 当前consumer id
         * @param mqAll 当前topic的所有queue实例引用
         * @param cidAll 当前 consumer群组下所有的consumer id set集合
         * @return 根据策略给当前consumer分配的queue列表
         */
        List<MessageQueue> allocate(
            final String consumerGroup,
            final String currentCID,
            final List<MessageQueue> mqAll,
            final List<String> cidAll
        );
    
        /**
         * 算法名称
         *
         * @return The strategy name
         */
        String getName();
    }
    

    当然,rocketMq也提供了默认的分配策略。


    分配策略.png
    算法名称 含义
    AllocateMessageQueueAveragely 平均分配算法
    AllocateMessageQueueAveragelyByCircle 基于环形平均分配算法
    AllocateMachineRoomNearby 基于机房临近原则算法
    AllocateMessageQueueByMachineRoom 基于机房分配算法
    AllocateMessageQueueConsistentHash 一致性hash算法
    AllocateMessageQueueByConfig 基于配置分配算法

    rocketMq默认使用平均分配算法

    public class DefaultMQPushConsumer{    
        /**
         * Default constructor.
         */
        public DefaultMQPushConsumer() {
            this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
        }
    
        /**
         * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
         *
         * @param consumerGroup Consume queue.
         * @param rpcHook RPC hook to execute before each remoting command.
         * @param allocateMessageQueueStrategy message queue allocating algorithm.
         */
        public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
            AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
            this.consumerGroup = consumerGroup;
            this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
            defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
        }
    }
    

    也不可能分配一次就不管了。rocketMq的策略是,每20s进行一次消息负载。也就是consumer和broker的重绑定。

    消息体中还有offset,offset存哪

    广播模式:因为所有队列都会被所有消费者消费。所以读到哪里的标记,记录在消费者那里。offset存在消费者。

    集群模式:队列中的消息,只会被group内的一个consumer消费。所以,offset要存在broker上。

    消息拉取流程

    1. consumer发送拉取请求
    2. broker收到请求后,根据group、queue、offset返回消息。
    3. consumer收到消息。

    相关文章

      网友评论

        本文标题:RocketMQ系列(四):consumer

        本文链接:https://www.haomeiwen.com/subject/fujftktx.html