美文网首页
RocketMQ-普通消息

RocketMQ-普通消息

作者: Travis_Wu | 来源:发表于2020-07-20 10:22 被阅读0次

一、摘要

  1. 默认消息发送超时时间为3s
  2. 默认消息发送是同步的发送模式,同步发送会发送1+重试次数,默认重试2,一共3次
  3. 消息内容不能为0,也不能超过4M
  4. 同步消息发送才会有重试机制,异步发送和oneway发送模式都只有一次发送机会。同步发送 1+重试次数(默认2)
  5. pull模式、push模式启动的时候都不会检查nameserv,pull模式在fetchqueue时没有nameserv时会报错,push模式没有nameserv不会报错

二、简单实例(官方实例)

  1. 发送实例
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}
  1. pull接收实例
public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        
      DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ProducerGroupName");
      consumer.setNamesrvAddr("localhost:9876");
      
      consumer.start();
      
      // 从nameserv上拉取的topic的队列信息
      Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC_TEST");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    // 远程拉取消息,拉取32个
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND://找到消息
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println("msgId:"+m.getMsgId());
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

//        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}
  1. push接收实例
public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TOPIC_TEST", "*");// topic  tag
        /**
         * 全新的消费组 才适用这些策略
         * CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
         * CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
         * CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
         */
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for(MessageExt msg:msgs){
                    System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody()));
                }
                /**
                 * CONSUME_SUCCESS 消费成功
                 * RECONSUME_LATER 重试消费,重试次数可以设置,默认是16次
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

三、RocketMQ发送普通消息的全流程解读

① DefaultMQProducer的启动流程

  • 入口:DefaultMQProducerImpl.start()
  • 初始化得到 MQClientInstance 实例对象
    1. MQClientInstance 的理解:
      • MQClientInstance 封装了 RocketMQ 网络处理的API,是Producer、Consumer与NameServer、Broker打交道的网络通道
      • 同一个JVM中不同 consumer 和 producer 获取的 MQClientInstance 实例是同一个
    2. MQClientInstance 创建过程:
      • 先设置 clientId,clientId 组成有 ip@pid@unitname,unitname 可选,例如:10.204.246.26@27066
      • 根据 clientId 从 factoryTable 中获取,也就是缓存 map 中获取。没有则new一个 MQClientInstance
  • 注册 Producer
    1. 将当前 Producer 缓存到 MQClientInstance 实例的 producerTable 成员变量中
    2. key 为 Producer 的名称,value 为当前 Producer 的实例
  • 将默认Topic(“TBW102”)保存至本地缓存变量 topicPublishInfoTable 中
  • MQClientInstance 实例对象调用自己的start()方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务
    1. Producer 感知 topic 的路由信息变化是通过定时线程,每30s去 nameserver 拉取,然后对本地缓存的路由信息更新
    2. 每30s向 Broker 发送一次心跳
    3. 每30s更新本地缓存的存活 Broker
    private void startScheduledTask() {
         //如果没有namesrvAddr,则启动定时器获取namesrvAddr地址(2分钟执 行1次)
         if (null == this.clientConfig.getNamesrvAddr()) {
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                 @Override
                 public void run() {
                     try {        
                       MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                     } catch (Exception e) {
                         log.error("ScheduledTask fetchNameServerAddr exception", e);
                     }
                 }
             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
         }
    
         //每30秒更新一次所有的topic的路由信息,延迟10毫秒执行
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 try {
                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                 } catch (Exception e) {
                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                 }
             }
         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
         // 每30秒对下线的broker进行移除
         // 每30秒发送一次心跳
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 try {
                     MQClientInstance.this.cleanOfflineBroker();
                     MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                 } catch (Exception e) {
                     log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                 }
             }
         }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
         // 持久化消费端offSet,每5s执行一次
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 try {
                     MQClientInstance.this.persistAllConsumerOffset();
                 } catch (Exception e) {
                     log.error("ScheduledTask persistAllConsumerOffset exception", e);
                 }
             }
         }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
         //定时调整消费者端线程池大小
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 try {
                     MQClientInstance.this.adjustThreadPool();
                 } catch (Exception e) {
                     log.error("ScheduledTask adjustThreadPool exception", e);
                 }
             }
         }, 1, 1, TimeUnit.MINUTES);
     }
    
  • 最后向所有的Broker代理服务器节点发送心跳包
    image.png

② send发送方法的核心流程

  • 消息校验
    消息的校验主要就是校验topic以及消息体的长度,发送消息的最大长度不能超过4M
  • 尝试获取TopicPublishInfo的路由信息
    先从缓存里面取,缓存没有则从nameserver取,取到之后会放到缓存
    我们一步步debug进去后会发现在sendDefaultImpl()方法中先对待发送的消息进行前置的验证。
    调用tryToFindTopicPublishInfo()方法,根据待发送消息的中包含的Topic尝试从Client端的本地缓存变量topicPublishInfoTable中查找,如果没有则会从NameServer上更新Topic的路由信息(其中,调用了MQClientInstance实例的updateTopicRouteInfoFromNameServer方法,最终执行的是MQClientAPIImpl实例的getTopicRouteInfoFromNameServer方法)
    这里分别会存在以下两种场景:
    1. 生产者第一次发送消息(此时,Topic在NameServer中并不存在):
      因为第一次获取时候并不能从远端的NameServer上拉取下来并更新本地缓存变量topicPublishInfoTable成功。因此,第二次需要通过默认Topic—TBW102的TopicRouteData变量来构造TopicPublishInfo对象,并更新DefaultMQProducerImpl实例的本地缓存变量topicPublishInfoTable。
      另外,在该种类型的场景下,当消息发送至Broker代理服务器时,在SendMessageProcessor业务处理器的sendBatchMessage/sendMessage方法里面的super.msgCheck(ctx, requestHeader, response)消息前置校验中,会调用TopicConfigManager的createTopicInSendMessageMethod方法,在Broker端完成新Topic的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json)
    2. 生产者发送Topic已存在的消息:由于在NameServer中已经存在了该Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中topicPublishInfoTable,随后tryToFindTopicPublishInfo方法直接可以return。
public class TopicPublishInfo {
    //是否是顺序消息
    private boolean orderTopic = false;
    //是否存在路由信息
    private boolean haveTopicRouterInfo = false;
    //改topic对应的逻辑队列,每一个逻辑队列就对应一个MessageQueue
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //用于选择消息队列的值,每选择一次消息队列,该值会自增1。
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    // topic路由元数据,如borker的元信息和队列的元信息
    private TopicRouteData topicRouteData;
    //....
    //....
}

//数据结构如下:
TopicPublishInfo [
orderTopic=false, 
messageQueueList=[MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=0], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=1], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=2], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=3]], 
sendWhichQueue=ThreadLocalIndex{threadLocalIndex=null},
haveTopicRouterInfo=true]
    /**
     * 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
     * 如果没有则更新路由信息,从nameserver端拉取最新路由信息
     *
     * topicPublishInfo
     * 
     * @param topic
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //step1.先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //step1.2 然后从nameServer上更新topic路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            //step2 然后再从本地缓存变量topicPublishInfoTable中再get一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            /**
             *  第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
             */
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
/**
     * 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
     *
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        //设置请求头中的Topic参数后,发送获取Topic路由信息的request请求给NameServer
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
       //这里由于是同步方式发送,所以直接return response的响应
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            //如果NameServer中不存在待发送消息的Topic
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            //如果获取Topic存在,则成功返回,利用TopicRouteData进行解码,且直接返回TopicRouteData
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
image.png
  • 选择消息发送的队列
    在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下,selectOneMessageQueuef()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义private boolean sendLatencyFaultEnable = false通过一个sendLatencyFaultEnable开关来进行选择采用下面哪种方式:
    1. sendLatencyFaultEnable开关打开
      在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L
    2. sendLatencyFaultEnable开关关闭(默认关闭)
      采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
    /**
     * 根据sendLatencyFaultEnable开关是否打开来分两种情况选择队列发送消息
     * @param tpInfo
     * @param lastBrokerName
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {

                //1.在随机递增取模的基础上,再过滤掉not available的Broker代理;对之前失败的,按一定的时间做退避
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        //2.采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
  • 发送封装后的RemotingCommand数据包
    在选择完发送消息的队列后,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。在该方法内总共完成以下几个步流程:
    1. 根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的findBrokerAddressInPublish()方法,得到待发送消息中存放的Broker代理服务器地址,如果没有找到则跟新路由信息
    2. 如果没有禁用,则发送消息前后会有钩子函数的执行executeSendMessageHookBefore()executeSendMessageHookAfter()方法
    3. 将与该消息相关信息封装成RemotingCommand数据包,其中请求码RequestCode为以下几种之一:
      • A.SEND_MESSAGE(普通发送消息)
      • B.SEND_MESSAGE_V2(优化网络数据包发送)
      • C.SEND_BATCH_MESSAGE(消息批量发送)
    4. 根据获取到的Broke代理服务器地址,将封装好的RemotingCommand数据包发送对应的Broker上,默认发送超时间为3s
    5. 这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync()方法中
    6. processSendResponse方法对发送正常和异常情况分别进行不同的处理并返回sendResult对象
    7. 发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间
    8. 对于异常情况,且标志位retryAnotherBrokerWhenNotStoreOK,设置为true时,在发送失败的时候,会选择换一个Broker

③ Broker代理服务器的消息处理简析

Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:

  • 消息前置校验,包括broker是否可写、校验queueId是否超过指定大小、消息中的Topic路由信息是否存在,如果不存在就新建一个
  • 构建MessageExtBrokerInner
  • 调用brokerController.getMessageStore().putMessage将MessageExtBrokerInner做落盘持久化处理
  • 根据消息落盘结果(正常/异常情况),BrokerStatsManager做一些统计数据的更新,最后设置Response并返回

四、总结

消息发送的核心逻辑在sendKernelImpl方法,这里简单归纳下,主要做了以下几件事:

  1. 根据对应的messageQuene获取broker网络地址。
  2. 为消息分配全局的唯一id
  3. 压缩消息,如果消息体大小超过compressMsgBodyOverHowmuch配置的值(默认4K),则进行压缩
  4. 是否存在发送消息钩子sendMessageHookList,存在则执行钩子
  5. 构建消息发送的请求头SendMessageRequestHeader
  6. 根据不同发送方式进行消息的发送。如果失败进入循环重试
    • 同步发送(SYNC):同步阻塞等待broker处理完消息后返回结果
    • 异步发送(ASYNC):
      不阻塞等待broker处理消息的结果,通过提供回调方法,响应消息发送结果。
      这种方式的发送,RocketMQ做了并发控制,通过clientAsyncSemaphoreValue参数控制,默认值是65535。
      异步发送消息的消息重试次数是通过retryTimesWhenSendAsyncFailed控制的,但如果网络出现异常是无法发生重试的
    • 单向发送(ONEWAY):不关心消息发送是否成功,只管发送
  7. 继续判断发送消息钩子,有则执行钩子的after逻辑

相关文章

  • RocketMQ-普通消息

    一、摘要 默认消息发送超时时间为3s 默认消息发送是同步的发送模式,同步发送会发送1+重试次数,默认重试2,一共3...

  • RocketMQ-消息发送

    简介 本文通过问题入手,介绍下RocketMQ的消息发送逻辑是怎么样的。消息发送的大体逻辑图如下: 问题 首先我们...

  • RocketMQ-延时消息

    一、延时消息的使用 使用比较简单,指定message的DelayTimeLevel即可。示例代码如下: 目前roc...

  • RocketMQ-事务消息

    一、事务消息的引出 以购物场景为例,张三购买物品,账户扣款 100 元的同时,需要保证在下游的会员服务中给该账户增...

  • RocketMQ-异步消息

    异步消息 生产者

  • RocketMQ-广播消息

    广播消息 生产者 消费者

  • RocketMQ-顺序消息

    敬请期待!!

  • RocketMQ-延迟消息

    延迟消息 消息发送到Broker后,要特定的事件才会被Consumer消费。 生产者 默认配置级别

  • RocketMQ-发送消息

    运行NameServer和Broker后,我们可以尝试用代码示例写一下:消息发送的三种方式 同步发送,适用场景:对...

  • 消息队列之RocketMQ-事务消息

    1、从本地事务到分布式事务 我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之...

网友评论

      本文标题:RocketMQ-普通消息

      本文链接:https://www.haomeiwen.com/subject/oqdekktx.html