美文网首页
RocketMQ第四讲

RocketMQ第四讲

作者: yunhubuxi_2019 | 来源:发表于2020-01-15 23:08 被阅读0次

这一讲分析的是RocketMQ的消费者,下一讲分析broker。从第三讲开始,主要是对照源码来分析,所以这次的分享不仅仅是讲消费者的逻辑和消费过程,还整理了client端(生产者和消费者都是client)的接口图,启动之后消费端的状态。

读官方文档,读RocketMQ相关的书,还有查阅网上相关的一些博客,别人分析的再系统,再有条理,从再多的维度去分析,看再多变,我总是觉得不是很清晰,理解的不够透彻,因为涉及到的逻辑太多了,过几天就忘了。所以在学习的过程中,必须整理出自己理解,虽不能面面俱到,但因为是自己的理解,就不会忘记。

client接口和类图

client端是依赖remoting模块的,所以把remoting模块的一些接口图也整合进来了。

remoting接口

Remoting

client接口

clien

主要接口图

cient_remoting

接口依赖关系图

从下面这张图中可以清晰的看出,client只用的了remoting接口的RemotingClient。RemotingServer接口在Namesrv和Broker中用到了。

client_remoting_d

接口总结

RocketMQ的设计中,分了三层接口:

用户接口,实现接口,remoting接口。用户接口是开箱即用的,有默认的实现类,实现类是把用户的配置,业务逻辑,配置文件信息,在启动的时候,初始化进去。核心的业务逻辑在实现接口的实现类里。

实现接口的实现类持有MQClientInstance,MQClientInstance通过MQClientAPIImpl和remoting接口层打交道。MQClientInstance,MQClientAPIImpl和MQAdminImpl都没有接口,MQAdminImpl持有MQClientInstance,访问remoting接口也是通过MQClientAPIImpl类。

RocketMQ消息消费概述

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。

集群模式,主题下的同一条消息只允许被其中一个消费者消费。广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

集群模式下,多个消费者如何对消息队列进行负载?消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被同一个消费者消费,一个消费者可以消费多少个消息队列。

消息消费者初探

推模式的消费者MQPushConsume

image.png

实现类DefaultMQPushConsumer

image.png image.png

DefaultMQPushConsumerImpl分析

  • DefaultMQPushConsumerImpl :消息消息者默认实现类,应用程序中直接用该类的实例完成消息的消费,并回调业务方法。
image.png
  • RebalanceImpl :字面上的意思(重新平衡)也就是消费端消费者与消息队列的重新分布,与消息应该分配给哪个消费者消费息息相关。

  • MQClientInstance: 消息客户端实例,负载与MQ服务器(Broker,Nameserver)交互的网络实现

  • PullAPIWrapper: pull与Push在RocketMQ中,其实就只有Pull模式,所以Push其实就是用pull封装一下

  • MessageListenerInner: 消费消费回调类,当消息分配给消费者消费时,执行的业务代码入口

  • OffsetStore: 消息消费进度保存

  • ConsumeMessageService: 消息消费逻辑

消费者启动流程

  • 构建主题订阅信息SubscriptionData并加入到RebalanceImpl的订阅消息中
  • 初始化MQClientInstance、RebalanceImple等
  • 初始化消息进度。如果消费时集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息进度存储在消费端。
  • 根据是否是顺序消费,创建消费端线程服务
  • 向MQClientInstance注册消费者

启动流程图

流程图

启动分析

DefaultMQPushConsumer

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        consumer.setNamesrvAddr("localhost:9876");
        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");
        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

## start方法
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

defaultMQPushConsumerImpl

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

关键部分 ConsumeMessageService.png

this.consumeMessageService =
   new ConsumeMessageConcurrentlyService(this, 
         (MessageListenerConcurrently) this.getMessageListenerInner());
this.consumeMessageService.start();

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.mQClientFactory = MQClientManager.getInstance()
    .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

mQClientFactory.start();

MQClientInstance

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

没有消息的时候

image.png

有消息的时候

image.png

启动后线程

  • PullMessageService
  • RebalanceService
  • 定时任务线程MQClientFactoryScheduledThread和PullMessageServiceScheduledThread
  • Netty相关线程,NettyClientPublicExecutor,NettyClientWorkerThread,NettyClientSelector
  • 消费者线程ConsumeMessageThread

消息拉取

基于PUSH模式来详细分析消息拉取机制。

从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。

PullMessageService实现机制

image.png

ProcessQueue实现机制

ProcessQueue是MessageQueue在消费端的重现、快照。PullMessageService从消息服务器默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue中。PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。

ProcessQueue核心属性

image.png

ProcessQueue核心方法

image.png

消息拉取基本流程

  1. 消息拉取客户端消息拉取请求封装
  2. 消息服务器查找并返回消息
  3. 消息拉取客户端处理返回的消息

相关文章

网友评论

      本文标题:RocketMQ第四讲

      本文链接:https://www.haomeiwen.com/subject/cqpxzctx.html