美文网首页
消费者是如何获取消息处理以及进行ACK的

消费者是如何获取消息处理以及进行ACK的

作者: G__yuan | 来源:发表于2021-07-01 14:41 被阅读0次

    首先了解一个概念(消费者组)

    消费者组的意思,就是让你给一组消费者起一个名字,比如我们有一个topic叫:TopicOrderPaySuccess,然后假设有库存系统,积分系统,营销系统,仓库存储系统他们都要去消费这个topic中的数据,那么此时就应该分别给这些系统起一个消费组的名字,比如说:stock_consumer_group,marketing_consumer_group,credie_consumer_group,wms_consumer_group。设置消费者组的方式为如下:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xxx_consumer_group");
    

    然后比如库存系统部署了3台机器,每台机器上的消费者组的名字都是“stock_consumer_group”,那么这3台机器就同属于一个消费者组,以此类推,每个系统的几台机器都是属于各自的消费者组的。
    然后解释一下,不同消费者之间的关系,假设库存系统和营销系统作为两个消费者组,都订阅了“topicOrderPaySuccess”这个订单支付成功消息的topic,此时假设订单系统作为生产者发送了一条消息到这个topic,正常情况下,这条消息进入Broker之后,库存系统和营销系统作为两个消费者组,每个组都会拉取到这条消息,也就说是这个订单支付成功的消息,库存系统会获取到一条,营销系统也会获取到一条,他们两都会获取到这条消息。那么另一个问题来了,比如库存系统中的有多台机器属于这个消费者组,那么它这是多台机器都会获取到这条消息,还是只有一台机器可以获取到这条消息,答案是,正常情况下,库存系统的多台机器上,只有一台机器会获取到这条消息,其他系统也同理。

    集群模式消费vs广播模式消费

    对于一个消费者组而言,获取到一条消息之后,如果这个消费者组有多台机器,那么到底是只有一台机器可以获取到这条消息,还是每台机器都可以获取到这个消息。
    上面所说的问题其实就是集群模式和广播模式的区别。
    默认情况下是集群模式,就是一个消费者组得到一条消息之后,只会交给组内的一台机器去处理,不是没每台机器都会获取到这条消息。
    修改为广播模式之后,就是同一个消费者组内的每台机器都能获取到这条消息。
    修改是否是集群模式还是广播模式的配置如下:

    consumer.setMessageModel(MessageModel.BROADCASTING); //修改为广播模式
    

    MessageQueue与消费者的关系

    大致可以认为一个Topic的多个MessageQueue会均匀分摊给消费组内的多个机器去消费,有个原则就是,一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消息处理。

    Push模式vsPull模式

    实际上,这两个消费模式的本质是一样的,都是消费者机器主动发送请求到Broker机器上拉取一批消息下来。
    Push消费模式,本地底层也是基于消费者主动拉取的模式来实现的,只不过就是它的名字叫做push罢了,意思是Broker会尽可能实时的把新消息交给消费者机器来进行处理,它的消费时效性会更好。一般我们都是基于push模式来消费数据,pull模式的代码写起来更加的繁琐和复杂。
    Push模式的实现思路基本是:当消费者发送请求到Broker去拉取消息的时候,如果有新的消息可以消费那么就会立马返回一批消息到消费机器去处理,处理完之后会接着立刻发送请求到Broker机器上去拉取下一批数据。另外Push模式下有一个请求挂起和长轮询机制。当你的请求发送到Broker,它如果发现没有新的消息给你处理的时候,就会让请求线程挂起,默认说是挂起15秒,然后这个期间它会有后台线程每隔一会儿就去检查一下是否有新的消息给你,还有就是如果在这个挂起过程中,如果有新的消息到达了会主动唤醒挂起的线程,然后把消息返回给你。

    Broker是如何将消息读取出来返回给消费机器的。

    Broker在接收到拉取消息请求之后,是怎么将消息读取出来返回给消费机器的。这里面涉及到两个概念,分别是ConsumeQueue和CommitLog,假设一个消费者机器发送了拉取请求到Broker了,它说我这次要拉取MessageQueue0中的消息,然后我之前没有拉取过消息,所以就从这个MessageQueue0中的第一条消息开始拉取好了。于是,Broker就会找到MessageQueue0的ComsumeQueue0,从里面找到第一条消息的offfset,接着Broker就根据读取到的这个offset,去commitLog中根据这个offset地址去读取出来这条消息的数据,然后把这条消息的数据返回给消费者机器。

    消费者机器如何处理消息,进行ACK以及提交消费进度

    消费者拉取到一批数据之后,就会回调我们注册的一个函数:

    consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
         });
    
         consumer.start();
    

    当消费者处理完这批消息之后,消费者机器就会提交我们目前的一个消费进度到Broker上去,然后Broker就会存储我们的消费进度。

    如果消费组中出现机器宕机或者扩容加机器,会怎么处理。

    这个时候其实会进入一个rabalance环节,也就是说重新给各个消费机器分配他们要处理的MessageQueue。举个例子说明:比如现在机器01负责MessageQueue0和MessageQueue1,机器02负责MessageQueue2和MessageQueue3,现在机器02宕机了 ,那么机器01就会接管机器02之前负责的MessageQueue2和MessageQueue3.或者如果此时消费组加入了一台机器03,此时就可以把机器02之前负责的MessageQueue3转移给机器03,然后机器01就仅仅负责一个MessageQueue2的消费了,这就是负载重平衡的概念。

    相关文章

      网友评论

          本文标题:消费者是如何获取消息处理以及进行ACK的

          本文链接:https://www.haomeiwen.com/subject/btljultx.html