美文网首页实时数据相关我爱编程Framwork
RocketMQ学习-消息发布和订阅

RocketMQ学习-消息发布和订阅

作者: 程序熊大 | 来源:发表于2018-03-31 23:11 被阅读1998次

    前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。

    一、RocketMQ消息模型

    屏幕快照 2018-03-31 14.50.41.png

    在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。

    producer会轮询向指定topic的mq集合发送消息。

    consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。

    consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。

    二、消息发送

    生产者Demo

    首先给出代码,

    package com.javadu.chapter8rocketmq.message;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 作用: 同步发送消息
     * User: duqi
     * Date: 2018/3/29
     * Time: 13:52
     */
    @Component
    public class ProducerDemo {
    
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQProducer() {
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
    
            defaultMQProducer.setNamesrvAddr(namesrvAddr);
    
            try {
                defaultMQProducer.start();
    
                Message message = new Message("TopicTest", "TagA",
                                              "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                for (int i = 0; i < 100; i++) {
                    SendResult sendResult = defaultMQProducer.send(message);
                    System.out.println("发送消息结果, msgId:" + sendResult.getMsgId() +
                                       ", 发送状态:" + sendResult.getSendStatus());
                }
    
            } catch (MQClientException | UnsupportedEncodingException | InterruptedException
                | RemotingException | MQBrokerException e) {
                e.printStackTrace();
            } finally {
                defaultMQProducer.shutdown();
            }
        }
    
    }
    

    生产者中有两个属性:

    • name server的地址,用于获得broker的相关信息
    • 生产者集合producerGroup,在同一个producer group中有不同的producer实例,如果最早一个producer奔溃,则broker会通知该组内的其他producer实例进行事务提交或回滚。

    RocketMQ中的消息,使用Message表示,代码定义如下:

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
    
        private String topic;
        private int flag;
        private Map<String, String> properties;
        private byte[] body;
    
        public Message() {
        }
        //省略了getter和setter方法
    }
    
    • topic:该消息将要往哪个topic发
    • flag:可以用作消息过滤
    • properties:扩展字段,可以做一些系统级别的通用值的透传,例如skywalking的segmentId
    • body:消息内容

    每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:

    public class SendResult {
        //发送状态
        private SendStatus sendStatus;
        //消息ID,用于消息去重、消息跟踪
        private String msgId;
        private MessageQueue messageQueue;
        private long queueOffset;
        //事务ID
        private String transactionId;
        private String offsetMsgId;
        private String regionId;
        //是否需要跟踪
        private boolean traceOn = true;
    
        public SendResult() {
        }
        //省略了构造函数、getter和setter等一系列方法
    }
    

    在这个demo中,我们是将消息内容和消息状态一并打印到控制台。

    消息发送源码分析

    在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。

    屏幕快照 2018-03-31 11.51.36.png

    首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。

    MQProducer.png

    在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:

    • 根据服务状态决定接下来的动作
    • 对于CREATE_JUST状态
      • 设置服务状态
      • 检查配置
      • 获取或创建MQClientInstance实例
      • 将生产者注册到指定的producerGroup,即producerTable这个数据结构中,是一个map
      • 填充topicPublishInfoTable数据结构
      • 启动生产者
    • 对于RUNNING、START_FAILED和SHUTDOWN_ALREADY,抛出异常
     public void start(final boolean startFactory) throws MQClientException {
            //根据当前的服务状态决定接下来的动作
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
    
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
    
                    //创建一个客户端工厂
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                    //将生产者注册到指定producer group
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
                    
                    //填充topicPublishInfoTable
                    this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    
                    if (startFactory) {
                        mQClientFactory.start();
                    }
    
                    log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The producer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            //给该producer连接的所有broker发送心跳消息
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    

    顺着mQClientFactory.start()往下跟,可以进一步了解生产者的细节,主要步骤有:

    • 建立请求响应通道
    • 启动各种定时任务,例如:每隔2分钟向name server拉取一次broker集群的地址,这意味着如果某个broker宕机了,生产者在这两分钟之内的消息是投递失败的;定期从name server拉取topic等路由信息;定期清理失效的broker以及向broker发送心跳消息等。
    • 启动拉服务、负载均衡服务、推服务等服务,这三个服务跟消费者有关。这里设计上不太明了,将消费者和生产者的启动逻辑放在一起了。看pullMessageService和rebalanceService和初始化,它们是根据MQClientInstance初始化的,而MQClientInstance又是根据ClientConfig来配置的。
      public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        this.startScheduledTask();
                        // Start pull service
                        this.pullMessageService.start();
                        // Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    

    生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。

    屏幕快照 2018-03-31 12.26.32.png

    这里我们只看最简单的send(Message message)方法,最终在DefaultMQProducerImpl中实现:

        private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            //确认生产者状态正常
            this.makeSureStateOK();
            //检查消息的合法性
            Validators.checkMessage(msg, this.defaultMQProducer);
    
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            //获取消息的目的地:Topic信息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                MessageQueue mq = null;
                Exception exception = null;
                SendResult sendResult = null;
                //计算出消息的投递次数,如果是同步投递,则是1+重试次数,如果不是同步投递,则只需要投递一次
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0;
                String[] brokersSent = new String[timesTotal];
                //一个broker集群有不同的broker节点,lastBrokerName记录了上次投递的broker节点,每个broker节点
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    //选择一个要发送的消息队列
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            //投递消息
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            //根据消息发送模式,对消息发送结果做不同的处理
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQClientException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
    
                            log.warn("sendKernelImpl exception", e);
                            log.warn(msg.toString());
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
    
                if (sendResult != null) {
                    return sendResult;
                }
    
                String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                    times,
                    System.currentTimeMillis() - beginTimestampFirst,
                    msg.getTopic(),
                    Arrays.toString(brokersSent));
    
                info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    
                MQClientException mqClientException = new MQClientException(info, exception);
                if (exception instanceof MQBrokerException) {
                    mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
                } else if (exception instanceof RemotingConnectException) {
                    mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
                } else if (exception instanceof RemotingTimeoutException) {
                    mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
                } else if (exception instanceof MQClientException) {
                    mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
                }
    
                throw mqClientException;
            }
    
            List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
            if (null == nsList || nsList.isEmpty()) {
                throw new MQClientException(
                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
            }
    
            throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
    

    发送消息的主要过程如下:

    • 首先检查生产者和消息的合法性
    • 然后获取消息发送的信息,该信息存放在TopicPublishInfo对象中:
    public class TopicPublishInfo {
        //是否顺序消息
        private boolean orderTopic = false;
        private boolean haveTopicRouterInfo = false;
        //维护该topic下用于的消息队列列表
        private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
        //计算下一次该投递的队列,这里应用ThreadLocal,即使是同一台机器中,每个producer实例都有自己的队列
        private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
        private TopicRouteData topicRouteData;
    
        //省略了getter和setter方法
        
        //选择指定lastBrokerName上的下一个mq
        public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
            if (lastBrokerName == null) {
                return selectOneMessageQueue();
            } else {
                int index = this.sendWhichQueue.getAndIncrement();
                for (int i = 0; i < this.messageQueueList.size(); i++) {
                    int pos = Math.abs(index++) % this.messageQueueList.size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = this.messageQueueList.get(pos);
                    if (!mq.getBrokerName().equals(lastBrokerName)) {
                        return mq;
                    }
                }
                return selectOneMessageQueue();
            }
        }
    
        //选择当前broker节点的下一个mq
        public MessageQueue selectOneMessageQueue() {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            return this.messageQueueList.get(pos);
        }
    }
    
    • 选择要发送给该topic下的那个MessageQueue,选择的逻辑分两种情况:(1)默认情况,在上次投递的broker节点上,轮询到下一个message queue来发送;(2)sendLatencyFaultEnable这个值设置为true的时候,这块没太看懂。
    • 投递消息
    • 根据消息队列运行模式,针对投递结果做不同的处理。

    二、消息消费

    消费者Demo

    消费者里有个属性需要看下:

    • consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;consumerGroup中的实例还可以实现负载均衡和容灾。PS:处于同一个consumerGroup里的consumer实例一定是订阅了同一个topic。
    • nameServer的地址:name server地址,用于获取broker、topic信息

    消费者Demo里做了以下几个事情:

    • 设置配置属性
    • 设置订阅的topic,可以指定tag
    • 设置第一次启动的时候,从message queue的哪里开始消费
    • 设置消息处理器
    • 启动消费者
    package com.javadu.chapter8rocketmq.message;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 作用:
     * User: duqi
     * Date: 2018/3/29
     * Time: 14:00
     */
    @Component
    public class ConsumerDemo {
    
        /**
         * 消费者的组名
         */
        @Value("${apache.rocketmq.consumer.consumerGroup}")
        private String consumerGroup;
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQPushConsumer() {
            //消费者的组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
    
            //指定NameServer地址,多个地址以 ; 隔开
            consumer.setNamesrvAddr(namesrvAddr);
            try {
                //订阅PushTopic下Tag为push的消息
                consumer.subscribe("TopicTest", "TagA");
    
                //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
                //如果非第一次启动,那么按照上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
    
                            //输出消息内容
                            System.out.println("messageExt: " + messageExt);
    
                            String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
    
                            //输出消息内容
                            System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //稍后再试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    

    消费者源码分析

    前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。

    MQConsumer.png

    在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:

        /**
         * Subscribe a topic to consuming subscription.
         *
         * @param topic topic to subscribe.
         * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
         * if null or * expression,meaning subscribe all
         * @throws MQClientException if there is any client error.
         */
        @Override
        public void subscribe(String topic, String subExpression) throws MQClientException {
            this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
        }
    

    第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:

        public void subscribe(String topic, String subExpression) throws MQClientException {
            try {
                //构建包含订阅信息的对象,并放入负载平衡组件维护的map中,以topic为key
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    topic, subExpression);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                //如果已经跟broker集群建立连接,则给所有的broker节点发送心跳消息
                if (this.mQClientFactory != null) {
                    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                }
            } catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }
        }
    

    在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值

    • CONSUME_FROM_LAST_OFFSET,默认值,表示从上次停止时的地方开始消费
    • CONSUME_FROM_FIRST_OFFSET,从队列的头部开始消费
    • CONSUME_FROM_TIMESTAMP,从指定的时间点开始消费

    ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):

        /**
         * Register a callback to execute on message arrival for concurrent consuming.
         *
         * @param messageListener message handling callback.
         */
        @Override
        public void registerMessageListener(MessageListenerConcurrently messageListener) {
            this.messageListener = messageListener;
            this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
        }
    

    最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:

    • 检查配置
    • 将订阅信息拷贝到负载均衡组件(rebalanceImpl)中;
    • 负载均衡组件的几个属性的设置
    • 处理不同消息模式(集群模式或广播模式)的配置
    • 处理顺序消费和并发消费的不同配置
    • 将消费者信息和consumer group注册到MQ客户端实例的consumerTable中
    • 启动消费者客户端

    参考资料

    1. 分布式开放消息系统(RocketMQ)的原理与实践
    2. 买好车提供的rocketmq-spring-boot-starter
    3. Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控

    相关文章

      网友评论

      • bed90b386e48:你这个idea的配色看着很舒服,是用的哪个?

      本文标题:RocketMQ学习-消息发布和订阅

      本文链接:https://www.haomeiwen.com/subject/tuymcftx.html