RocketMQ分享

作者: 土豆肉丝盖浇饭 | 来源:发表于2019-06-12 14:45 被阅读29次

    比较粗的分享

    RocketMQ分享

    介绍

    技术架构


    RocketMQ架构上主要分为四部分,如上图所示:

    • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
    • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
    • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
    • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
    1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

    消息类型

    • 普通消息
    • 延迟消息
    • 顺序消息
    • 事务消息

    发送模式

    • 同步
    • 异步
    • 单向

    消费模式

    • 集群
    • 广播

    刷盘模式

    • 同步
    • 异步

    消息过滤

    • 表达式
      • TAG
      • SQL
    • 过滤类

    为何高性能

    • 顺序写随机读
    • 文件IO mmap 内存映射
    • 网络IO mmap+write 零拷贝
    • 异步刷盘 直接内存缓冲
    • 分区消费
    • 主从架构
    • 其他代码细节上的设计

    存储结构

    image.png

    存储结构和MQ的框架提供的功能息息相关

    这些文件默认都存储在/{user.home}/store下

    文件名 作用
    commitlog 存储实际消息
    consumequeue 存储每个queue对应的消息,消费者要通过consumequeue去获取消息消费
    index 消息索引文件
    abort 用于判断是否异常关闭
    checkpoint 启动时恢复使用
    consumerOffset.json 记录每个consumerGroup的消费进度
    dealyOffset.json 记录每个延迟topic的消费进度
    subscriotionGroup.json 记录consumergroup的一些配置
    topics.json 记录所有topics

    Broker注册到NameServer

    在broker启动的时候 会开启一个线程池,定时向NameServer更新路由信息,同时topic发生改变时,也会主动触发

    broker端
    BrokerController.start

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
            //TopicConfigSerializeWrapper封装了路由信息
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            //如果当前broker的权限不是可读可写,用broker的权限覆盖topic的权限
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
    
            //needRegister会查询每个nameserver的dataversion,如果不一致代表需要更新路由信息
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }
    
    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
            TopicConfigSerializeWrapper topicConfigWrapper) {
            //通过brokerOuterAPI发送到nameserver
            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());
    
            //下面暂时不知道具体干嘛用
            if (registerBrokerResultList.size() > 0) {
                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
                if (registerBrokerResult != null) {
                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    }
    
                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                    if (checkOrderConfig) {
                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                    }
                }
            }
        }
    

    nameserver端
    通过DefaultRequestProcessor处理各种请求,注册broker逻辑如下

    case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
    
    public RemotingCommand registerBroker(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
            final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
            final RegisterBrokerRequestHeader requestHeader =
                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
            if (!checksum(ctx, request, requestHeader)) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("crc32 not match");
                return response;
            }
    
            TopicConfigSerializeWrapper topicConfigWrapper;
            if (request.getBody() != null) {
                topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
            } else {
                topicConfigWrapper = new TopicConfigSerializeWrapper();
                topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
                topicConfigWrapper.getDataVersion().setTimestamp(0);
            }
    
            RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                requestHeader.getClusterName(),
                requestHeader.getBrokerAddr(),
                requestHeader.getBrokerName(),
                requestHeader.getBrokerId(),
                requestHeader.getHaServerAddr(),
                topicConfigWrapper,
                null,
                ctx.channel()
            );
    
            responseHeader.setHaServerAddr(result.getHaServerAddr());
            responseHeader.setMasterAddr(result.getMasterAddr());
    
            // TODO: 2019-05-14 这里面存了什么
            byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
            response.setBody(jsonValue);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    

    通过 this.namesrvController.getRouteInfoManager().registerBroker注册

    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
            RegisterBrokerResult result = new RegisterBrokerResult();
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
    
                    //更新cluster和broker关系
                    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                    if (null == brokerNames) {
                        brokerNames = new HashSet<String>();
                        this.clusterAddrTable.put(clusterName, brokerNames);
                    }
                    brokerNames.add(brokerName);
    
                    boolean registerFirst = false;
    
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null == brokerData) {
                        registerFirst = true;
                        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                        this.brokerAddrTable.put(brokerName, brokerData);
                    }
                    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                    //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                    //The same IP:PORT must only have one record in brokerAddrTable
                    Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> item = it.next();
                        if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                            it.remove();
                        }
                    }
    
                    String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                    registerFirst = registerFirst || (null == oldAddr);
    
                    if (null != topicConfigWrapper
                        && MixAll.MASTER_ID == brokerId) {
                        if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                            ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                            if (tcTable != null) {
                                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                    //保存topic信息
                                    this.createAndUpdateQueueData(brokerName, entry.getValue());
                                }
                            }
                        }
                    }
    
                    //保存broker信息
                    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                            System.currentTimeMillis(),
                            topicConfigWrapper.getDataVersion(),
                            channel,
                            haServerAddr));
                    if (null == prevBrokerLiveInfo) {
                        log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                    }
    
                    if (filterServerList != null) {
                        if (filterServerList.isEmpty()) {
                            this.filterServerTable.remove(brokerAddr);
                        } else {
                            this.filterServerTable.put(brokerAddr, filterServerList);
                        }
                    }
    
                    //如果当前发送请求的broker不是master
                    //给他返回它的master信息
                    if (MixAll.MASTER_ID != brokerId) {
                        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                        if (masterAddr != null) {
                            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                            if (brokerLiveInfo != null) {
                                result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                                result.setMasterAddr(masterAddr);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("registerBroker Exception", e);
            }
    
            return result;
        }
    

    最主要的路由信息通过createAndUpdateQueueData保存

    //将TopicConfig更新到本地的topicQueueTable
        //这个结构维护每个topic在那些broker有存储,以及在对应broker的配置
        private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
            QueueData queueData = new QueueData();
            queueData.setBrokerName(brokerName);
            queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
            queueData.setReadQueueNums(topicConfig.getReadQueueNums());
            queueData.setPerm(topicConfig.getPerm());
            queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
    
            List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
            if (null == queueDataList) {
                queueDataList = new LinkedList<QueueData>();
                queueDataList.add(queueData);
                this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
                log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
            } else {
                boolean addNewOne = true;
    
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    if (qd.getBrokerName().equals(brokerName)) {
                        if (qd.equals(queueData)) {
                            addNewOne = false;
                        } else {
                            log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                                queueData);
                            it.remove();
                        }
                    }
                }
    
                if (addNewOne) {
                    queueDataList.add(queueData);
                }
            }
        }
    

    消息发送

    普通消息

    producer端

    生产者Topic发布信息更新

    在MQClientInstance实例start时,会启动定时任务每30秒定时更新路由信息,路由信息用来知道该topic下面一共有多少queue可以发送消息

    MQClientInstance.start->startScheduledTask

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
    public void updateTopicRouteInfoFromNameServer() {
            Set<String> topicList = new HashSet<String>();
    
            // Consumer
            {
                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQConsumerInner> entry = it.next();
                    MQConsumerInner impl = entry.getValue();
                    if (impl != null) {
                        Set<SubscriptionData> subList = impl.subscriptions();
                        if (subList != null) {
                            for (SubscriptionData subData : subList) {
                                topicList.add(subData.getTopic());
                            }
                        }
                    }
                }
            }
    
            // Producer
            {
                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQProducerInner> entry = it.next();
                    MQProducerInner impl = entry.getValue();
                    if (impl != null) {
                        Set<String> lst = impl.getPublishTopicList();
                        topicList.addAll(lst);
                    }
                }
            }
    
            //获取producer和consumer订阅的topic 去nameserver查询路由信息
            for (String topic : topicList) {
                this.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
            try {
                if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        TopicRouteData topicRouteData;
                        if (isDefault && defaultMQProducer != null) {
                            //这边逻辑针对没创建的topic
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                            if (topicRouteData != null) {
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
                            //这边是已经存在的topic
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                        }
                        if (topicRouteData != null) {
                            TopicRouteData old = this.topicRouteTable.get(topic);
                            boolean changed = topicRouteDataIsChange(old, topicRouteData);
                            if (!changed) {
                                changed = this.isNeedUpdateTopicRouteInfo(topic);
                            } else {
                                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                            }
    
                            //如果topic路由信息发生改变
                            if (changed) {
                                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    
                                //更新brokerAddrTable
                                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                                }
    
                                // Update Pub info
                                // 更新producer关于这个topic的发布信息 简单讲 就是一共有几个queue可以发消息
                                {
                                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                    publishInfo.setHaveTopicRouterInfo(true);
                                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQProducerInner> entry = it.next();
                                        MQProducerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicPublishInfo(topic, publishInfo);
                                        }
                                    }
                                }
    
                                // Update sub info
                                // 跟新订阅信息 也就是有几个queue可以选择接受消息
                                {
                                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQConsumerInner> entry = it.next();
                                        MQConsumerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                        }
                                    }
                                }
                                log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                                this.topicRouteTable.put(topic, cloneTopicRouteData);
                                return true;
                            }
                        } else {
                            log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                        }
                    } catch (Exception e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                        }
                    } finally {
                        this.lockNamesrv.unlock();
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
                }
            } catch (InterruptedException e) {
                log.warn("updateTopicRouteInfoFromNameServer Exception", e);
            }
    
            return false;
        }
    

    选择分区发送

    选择一个MessageQueue进行消息发送

    //发送失败重试
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            long costTime = beginTimestampPrev - beginTimestampFirst;
                            if (timeout < costTime) {
                                callTimeout = true;
                                break;
                            }
    
                            //发送消息
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQClientException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
    
                            log.warn("sendKernelImpl exception", e);
                            log.warn(msg.toString());
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
    

    选择分区的逻辑默认为轮询MessageQueue,也可以自己send的时候指定MessageQueueSelector

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg, selector, arg);
        }
    

    broker端

    1. SendMessageProcessor处理请求,消息本体落CommitLog
    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                            final RemotingCommand request,
                                            final SendMessageContext sendMessageContext,
                                            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    
            final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
            final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
    
            response.setOpaque(request.getOpaque());
    
            response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
            response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    
            log.debug("receive SendMessage request command, {}", request);
    
            final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
            if (this.brokerController.getMessageStore().now() < startTimstamp) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
                return response;
            }
    
            response.setCode(-1);
            super.msgCheck(ctx, requestHeader, response);
            if (response.getCode() != -1) {
                return response;
            }
    
            final byte[] body = request.getBody();
    
            int queueIdInt = requestHeader.getQueueId();
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    
            if (queueIdInt < 0) {
                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
            }
    
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic(requestHeader.getTopic());
            msgInner.setQueueId(queueIdInt);
    
            if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
                return response;
            }
    
            msgInner.setBody(body);
            msgInner.setFlag(requestHeader.getFlag());
            MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
            msgInner.setPropertiesString(requestHeader.getProperties());
            msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
            msgInner.setBornHost(ctx.channel().remoteAddress());
            msgInner.setStoreHost(this.getStoreHost());
            msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
            PutMessageResult putMessageResult = null;
            Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
            String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (traFlag != null && Boolean.parseBoolean(traFlag)) {
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
                    return response;
                }
                //处理事务prepare消息
                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
            } else {
                //调用store保存消息
                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
            }
    
            return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
    
        }
    
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so putMessage is forbidden");
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is slave mode, so putMessage is forbidden ");
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            if (!this.runningFlags.isWriteable()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            } else {
                this.printTimes.set(0);
            }
    
            if (msg.getTopic().length() > Byte.MAX_VALUE) {
                log.warn("putMessage message topic length too long " + msg.getTopic().length());
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
            }
    
            if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
            }
    
            if (this.isOSPageCacheBusy()) {
                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
            }
    
            long beginTime = this.getSystemClock().now();
            //将消息放入commitlog
            PutMessageResult result = this.commitLog.putMessage(msg);
    
            long eclipseTime = this.getSystemClock().now() - beginTime;
            if (eclipseTime > 500) {
                log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
            }
            this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
    
            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
    
            return result;
        }
    
    1. ReputMessageService构建ConsumeQueue,IndexFile
      在DefaultMessageStore会启动一个定时任务ReputMessageService,用于根据Commitlog构建ConsumeQueue和IndexFile。消息到了ConsumeQueue,消费者才能去消费。
    private void doReput() {
                if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                    log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                    this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
                }
                for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    
                    if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                        break;
                    }
    
                    //getData是一个个mappedfile返回的
                    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                    if (result != null) {
                        try {
                            this.reputFromOffset = result.getStartOffset();
    
                            for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                                //每调用一次checkMessageAndReturnSize,会从bytebuff解析出一条消息的关键信息返回
                                DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                                int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    
                                if (dispatchRequest.isSuccess()) {
                                    if (size > 0) {
                                        //进行dispatch
                                        //执行CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex的dispatch方法
                                        DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                                        if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                            //消息到达的回调
                                            //通知阻塞的客户端获取消息请求 消息到达
                                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                        }
    
                                        this.reputFromOffset += size;
                                        readSize += size;
                                        if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                            //slave记录一些统计信息
                                            DefaultMessageStore.this.storeStatsService
                                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                            DefaultMessageStore.this.storeStatsService
                                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                                .addAndGet(dispatchRequest.getMsgSize());
                                        }
                                    } else if (size == 0) {
                                        //如果size等于0 跳到下一个mappedfile
                                        this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                        readSize = result.getSize();
                                    }
                                } else if (!dispatchRequest.isSuccess()) {
    
                                    if (size > 0) {
                                        log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                        this.reputFromOffset += size;
                                    } else {
                                        doNext = false;
                                        log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);
    
                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        } finally {
                            result.release();
                        }
                    } else {
                        doNext = false;
                    }
                }
            }
    

    顺序消息

    通过MessageSelector将需要顺序发送的消息发送到固定一个queue

    延迟消息

    发送的时候设置

    msg.setDelayTimeLevel(delayLevel);
    

    会将原消息的topic替换为SCHEDULE_TOPIC_XXX,queueid替换为对应延迟时间的延迟队列,然后将原有的topic和queueid保存到消息的properties,发送到延迟队列。在Broker针对每个延迟队列会起一个线程轮询查看消息是否到达延迟时间,如果达到,在properties取出原有的topic的queueid构造新的消息发送到原来应该发送的队列。
    18个延迟队列分别对应一下延迟时间1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    在CommitLog.putMessage会做上述修改

    //延迟消息 只支持普通消息 和 事务提交消息
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                // Delay Delivery
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
    
                    //延迟消息会放到一个特定的队列
                    //并且把本该发往的队列元信息保存起来
                    topic = ScheduleMessageService.SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // Backup real topic, queueId
                    // 保存元信息
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    //修改topic和queue为延迟队列的 queueId代表延迟等级
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
    

    延迟消息到期重新放入Commitlog的定时任务为ScheduleMessageService

    消息的消费

    image.png

    我们以DefaultMQPushConsumer为例

    消费者Topic订阅信息更新

    和生产者Topic订阅信息在一套逻辑内,生产者和消费者共用MQClientInstance实例

    Rebalance得到分配的MessageQueue

    Rebalance用于给consumer分配queue,存在一个queue只能被一个消费者消费,一个消费者能消费多个queue的约束。

    在DefaultMQPushConsumer中会初始化RebalanceImpl实例,但是实际的触发在MQClientInstance的RebalanceService定时触发

    我们只关注rebalance的实际逻辑

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
                case BROADCASTING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    if (mqSet != null) {
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                        if (changed) {
                            this.messageQueueChanged(topic, mqSet, mqSet);
                            log.info("messageQueueChanged {} {} {} {}",
                                consumerGroup,
                                topic,
                                mqSet,
                                mqSet);
                        }
                    } else {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                    break;
                }
                case CLUSTERING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    //从获取所有consumer集合
                    //这个信息从broker获取。。topic属于哪个broker 就从哪个broker获取
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                        }
                    }
    
                    if (null == cidAll) {
                        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                    }
    
                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
    
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
    
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                        List<MessageQueue> allocateResult = null;
                        try {
                            //这里是rebalance的逻辑 会返回那些MessageQueue属于当前消费者
                            allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                        } catch (Throwable e) {
                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
    
                        //rebalance得到MessageQueue后,更新当前消费者的processqueue,通过processqueue去拉取消息
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            log.info(
                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                                allocateResultSet.size(), allocateResultSet);
                            //通知messageQueue修改了
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }
    

    根据MessageQueue更新ProcessQueue

    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
            final boolean isOrder) {
            boolean changed = false;
    
            //这段逻辑针对老的应该删除的pq
            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {
                    if (!mqSet.contains(mq)) {
                        //如果mq不属于当前消费者 设置对应pq为dropped
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                        }
                    } else if (pq.isPullExpired()) {
                        switch (this.consumeType()) {
                            case CONSUME_ACTIVELY:
                                break;
                            case CONSUME_PASSIVELY:
                                //如果pull超时 也设置为drop
                                pq.setDropped(true);
                                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                    it.remove();
                                    changed = true;
                                    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                        consumerGroup, mq);
                                }
                                break;
                            default:
                                break;
                        }
                    }
                }
            }
    
            //这段逻辑针对新的应该增加的pq
            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    //如果mq对应pq不存在 创建新的pq
                    if (isOrder && !this.lock(mq)) {
                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                        continue;
                    }
    
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    //获取该mq应该从哪里开始消费
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            //创建新的pullRequest
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
            }
    
            //向PullMessageService的pullRequestQueue放入pullRequestList
            //触发拉取消息
            this.dispatchPullRequest(pullRequestList);
    
            return changed;
        }
    

    拉取消息

    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
            for (PullRequest pullRequest : pullRequestList) {
                this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
                log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
            }
        }
    
    public void executePullRequestImmediately(final PullRequest pullRequest) {
            this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
        }
    
    public void executePullRequestImmediately(final PullRequest pullRequest) {
            try {
                this.pullRequestQueue.put(pullRequest);
            } catch (InterruptedException e) {
                log.error("executePullRequestImmediately pullRequestQueue.put", e);
            }
        }
    

    put之后会将阻塞的PullMessageService唤醒

    @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                try {
                    //阻塞等待 知道有PullRequest
                    //初始化的时候 PullRequest从rebalanceImpl那边来
                    PullRequest pullRequest = this.pullRequestQueue.take();
                    this.pullMessage(pullRequest);
                } catch (InterruptedException ignored) {
                } catch (Exception e) {
                    log.error("Pull Message Service Run Method exception", e);
                }
            }
    
            log.info(this.getServiceName() + " service end");
        }
    
    private void pullMessage(final PullRequest pullRequest) {
            final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
            if (consumer != null) {
                DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
                //调用具体consumer实例的pullMessage方法
                impl.pullMessage(pullRequest);
            } else {
                log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
            }
        }
    
    public void pullMessage(final PullRequest pullRequest) {
            final ProcessQueue processQueue = pullRequest.getProcessQueue();
            //如果是drop的pq 停止执行
            if (processQueue.isDropped()) {
                log.info("the pull request[{}] is dropped.", pullRequest.toString());
                return;
            }
    
            //设置pull时间点
            pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    
            //状态不对 3秒后在执行
            //重新将当前pullRequest放入PullMessageService
            try {
                this.makeSureStateOK();
            } catch (MQClientException e) {
                log.warn("pullMessage exception, consumer state not ok", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                return;
            }
    
            //消费者停止了 延迟执行
            if (this.isPause()) {
                log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
                return;
            }
    
            long cachedMessageCount = processQueue.getMsgCount().get();
            long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    
            //流控
            //如果pq中的消息超过默认1000条
            //延迟拉取
            if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                        this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                }
                return;
            }
    
            //流控
            //如果pq中的消息大于100M 延迟拉取
            if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                        this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                }
                return;
            }
    
            if (!this.consumeOrderly) {
                //不是顺序消费
                //流控
                //如果pq内消息跨度大于2000 延迟消费
                //这个指的是 consumequeue的offset  不是字节跨度
                if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                            pullRequest, queueMaxSpanFlowControlTimes);
                    }
                    return;
                }
            } else {
                //顺序消费 需要向broker锁定pq
                if (processQueue.isLocked()) {
                    if (!pullRequest.isLockedFirst()) {
                        //第一次获取消息 重新矫正位置
                        //lockedfirst代表是否第一次锁定过
                        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                        boolean brokerBusy = offset < pullRequest.getNextOffset();
                        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                            pullRequest, offset, brokerBusy);
                        if (brokerBusy) {
                            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                                pullRequest, offset);
                        }
                        //设置已经第一次锁定过
                        pullRequest.setLockedFirst(true);
                        pullRequest.setNextOffset(offset);
                    }
                } else {
                    //如果pq没有锁定 延迟拉取
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                    log.info("pull message later because not locked in broker, {}", pullRequest);
                    return;
                }
            }
    
            //获取不到订阅配置 延迟消费
            final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (null == subscriptionData) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.warn("find the consumer's subscription failed, {}", pullRequest);
                return;
            }
    
            final long beginTimestamp = System.currentTimeMillis();
    
            //由于拉取消息是异步的 拉取成功/异常会回调这个实现
            PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        //处理下返回的结果 比如 过滤
                        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
    
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                long prevRequestOffset = pullRequest.getNextOffset();
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                long pullRT = System.currentTimeMillis() - beginTimestamp;
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullRT);
    
                                long firstMsgOffset = Long.MAX_VALUE;
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    //没有消息 将pullRequest重新放入pullMessageService 延迟执行
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    
                                    //消息放入processQueue
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    //发送消费消息的请求 激活consumeMessageService
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
    
                                    //将pullRequest重新放入pullMessageService
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }
                                }
    
                                if (pullResult.getNextBeginOffset() < prevRequestOffset
                                    || firstMsgOffset < prevRequestOffset) {
                                    log.warn(
                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                        pullResult.getNextBeginOffset(),
                                        firstMsgOffset,
                                        prevRequestOffset);
                                }
    
                                break;
                            case NO_NEW_MSG:
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                break;
                            case NO_MATCHED_MSG:
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                break;
                            case OFFSET_ILLEGAL:
                                log.warn("the pull request offset illegal, {} {}",
                                    pullRequest.toString(), pullResult.toString());
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                                pullRequest.getProcessQueue().setDropped(true);
                                DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
    
                                    @Override
                                    public void run() {
                                        try {
                                            DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                pullRequest.getNextOffset(), false);
    
                                            DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
    
                                            DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
    
                                            log.warn("fix the pull request offset, {}", pullRequest);
                                        } catch (Throwable e) {
                                            log.error("executeTaskLater Exception", e);
                                        }
                                    }
                                }, 10000);
                                break;
                            default:
                                break;
                        }
                    }
                }
    
                @Override
                public void onException(Throwable e) {
                    if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("execute the pull request exception", e);
                    }
    
                    //发生异常 延迟拉取
                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                }
            };
    
            // TODO: 2019-05-14 这个标记干嘛用
            boolean commitOffsetEnable = false;
            long commitOffsetValue = 0L;
            if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                if (commitOffsetValue > 0) {
                    commitOffsetEnable = true;
                }
            }
    
            //是否有过滤表达式 或者 类过滤 配置
            String subExpression = null;
            boolean classFilter = false;
            SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (sd != null) {
                if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                    subExpression = sd.getSubString();
                }
    
                classFilter = sd.isClassFilterMode();
            }
    
            int sysFlag = PullSysFlag.buildSysFlag(
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
            );
            try {
                //调用实际拉取消息的API
                this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    subscriptionData.getExpressionType(),
                    subscriptionData.getSubVersion(),
                    pullRequest.getNextOffset(),
                    this.defaultMQPushConsumer.getPullBatchSize(),
                    sysFlag,
                    commitOffsetValue,
                    BROKER_SUSPEND_MAX_TIME_MILLIS,
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                    CommunicationMode.ASYNC, //异步拉取消息
                    pullCallback //注意这个回调也传过去了
                );
            } catch (Exception e) {
                //发生错误 延迟拉取
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        }
    

    PullCallback是拉取消息的回调,会把拉取到的消息保存到ProcessQueue,然后触发消息消费,关键代码如下

    //消息放入processQueue
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    //发送消费消息的请求 激活consumeMessageService
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
    

    consumeMessageService的实现有ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,分别对应并发消费和顺序消费。调用submitConsumeRequest方法会把触发这个服务进行消息消费。

    并发消费

    ConsumeRequest继承了Runnable接口,ConsumeMessageConcurrentlyService只是提供了一个线程池让他执行。

    public void run() {
                //如果processQueue被drop 停止消费
                if (this.processQueue.isDropped()) {
                    log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                    return;
                }
    
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
                ConsumeConcurrentlyStatus status = null;
    
                //消费消息的hook before
                ConsumeMessageContext consumeMessageContext = null;
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap<String, String>());
                    consumeMessageContext.setMq(messageQueue);
                    consumeMessageContext.setMsgList(msgs);
                    consumeMessageContext.setSuccess(false);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
    
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                try {
                    //如果是retry的消息 还原retry topic
                    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                    //设置消费开始时间
                    if (msgs != null && !msgs.isEmpty()) {
                        for (MessageExt msg : msgs) {
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    //并发执行消息
                    //虽然是msgs 但是默认是一条
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                            RemotingHelper.exceptionSimpleDesc(e),
                            ConsumeMessageConcurrentlyService.this.consumerGroup,
                            msgs,
                            messageQueue);
                    hasException = true;
                }
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    if (hasException) {
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                    returnType = ConsumeReturnType.SUCCESS;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                }
    
                if (null == status) {
                    log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                //hook after
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.setStatus(status.toString());
                    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                }
    
                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                    .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
    
                if (!processQueue.isDropped()) {
                    //消息消费结果的处理
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }
    

    通过我们配置的MessageListenerConcurrently对消息进行消费

    这边需要注意下对消费成功货失败的处理,处理逻辑在processConsumeResult中

    public void processConsumeResult(
            final ConsumeConcurrentlyStatus status,
            final ConsumeConcurrentlyContext context,
            final ConsumeRequest consumeRequest
        ) {
            //ackIndex默认为Integer.MAX
            int ackIndex = context.getAckIndex();
    
            if (consumeRequest.getMsgs().isEmpty())
                return;
    
            //下面的逻辑主要是ackIndex改变了
            switch (status) {
                case CONSUME_SUCCESS:
                    if (ackIndex >= consumeRequest.getMsgs().size()) {
                        //设置为消息的长度-1
                        ackIndex = consumeRequest.getMsgs().size() - 1;
                    }
                    int ok = ackIndex + 1;
                    //不可能有失败。。。
                    int failed = consumeRequest.getMsgs().size() - ok;
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                    break;
                case RECONSUME_LATER:
                    //失败ackIndex=-1
                    ackIndex = -1;
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                    break;
                default:
                    break;
            }
    
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                    }
                    break;
                case CLUSTERING:
                    //ackIndex表示确认消息的位置,现在的代码看下来,要么全成功,要么全失败
                    List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        //消费失败的消息  重新插入到commitlog  发送到retry topic 实际上还是用延迟队列实现
                        boolean result = this.sendMessageBack(msg, context);
                        //如果发送请求失败 那么本地再消费一次试试
                        if (!result) {
                            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                            msgBackFailed.add(msg);
                        }
                    }
    
                    if (!msgBackFailed.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(msgBackFailed);
    
                        //如果消费失败 重新消费
                        this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    }
                    break;
                default:
                    break;
            }
    
            //更新offsetstore
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
        }
    

    成功的话更新offsetStore
    失败通过submitConsumeRequestLater方法将这条消息重新插入到commitlog,发送到retry队列,retry队列底层还是由延迟队列实现。如果超过重试次数,就会放到DLQ死信队列。

    public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
            int delayLevel = context.getDelayLevelWhenNextConsume();
    
            try {
                this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
                return true;
            } catch (Exception e) {
                log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
            }
    
            return false;
        }
    
    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            try {
                String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
                this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
                    this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
            } catch (Exception e) {
                //如果上面的api失败
                //直接构造消息发送
                log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
    
                Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
    
                String originMsgId = MessageAccessor.getOriginMessageId(msg);
                MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
    
                newMsg.setFlag(msg.getFlag());
                MessageAccessor.setProperties(newMsg, msg.getProperties());
                MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
                MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
                MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
                newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
    
                this.mQClientFactory.getDefaultMQProducer().send(newMsg);
            }
        }
    

    顺序消费

    主要是对processqueue加锁,有兴趣自己了解

    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
    

    事务消息

    image.png
    1. 本地事务开发
    2. 执行业务逻辑
    3. 发送prepare消息
    4. 本地事务完成 或 回滚
    5. 消息事务回查

    相关文章

      网友评论

        本文标题:RocketMQ分享

        本文链接:https://www.haomeiwen.com/subject/mwhlfctx.html