美文网首页
rocketMq consumer消费消息

rocketMq consumer消费消息

作者: 圣村的希望 | 来源:发表于2018-12-11 15:30 被阅读0次

      在RocketMq中消费消息有两种方式,push和pull方式:

    1. push方式会向Consumer注册一个MessageListener,在成功拉取到消息之后会立即进行回调MessageListener的方法。
    2. pull方式由应用自己主动去调用Consumer拉取消息方法去broker拉取消息,并进行消费处理,主动权都是在broker手中。

      但是两种方式都是从broker pull拉取消息到本地来进行消费。consumerGroup是一类consumer的集合名称,他是用来消费同一类消息,并且消费逻辑是一样的。consumerGroup中consumer的数量最好不要超过订阅的topic中的queue数量,不然会有consumer的闲置。
      接下来看看client端consumer消费消息的源码实现:

    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
            AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
            this.consumerGroup = consumerGroup;
            //在实例化的时候就设置了consumer的负载均衡策略,这里默认采用的均摊实现
            this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
            defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
        }
    
    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
    
                    this.copySubscription();
    
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
    
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    
                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
                    this.offsetStore.load();
    
                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        this.consumeOrderly = true;
                        this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        this.consumeOrderly = false;
                        this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    }
    
                    this.consumeMessageService.start();
    
                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    
                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            this.mQClientFactory.checkClientInBroker();
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            this.mQClientFactory.rebalanceImmediately();
        }
    
    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
            this.clientConfig = clientConfig;
            this.instanceIndex = instanceIndex;
            this.nettyClientConfig = new NettyClientConfig();
            this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
            this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
            this.clientRemotingProcessor = new ClientRemotingProcessor(this);
            this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    
            if (this.clientConfig.getNamesrvAddr() != null) {
                this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
                log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
            }
    
            this.clientId = clientId;
    
            this.mQAdminImpl = new MQAdminImpl(this);
    
            this.pullMessageService = new PullMessageService(this);
    
            this.rebalanceService = new RebalanceService(this);
    
            this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
            this.defaultMQProducer.resetClientConfig(clientConfig);
    
            this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
    
            log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
                this.instanceIndex,
                this.clientId,
                this.clientConfig,
                MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
        }
    

      调用consumer.start()启动了消费者,主要是做了一下几件事情:

    1. 创建了MQClientInstance实例,主要靠他来进行和broker进行通信发送请求等。
    2. 实例化OffsetStore,对于不同的消息消费类型生成对应的OffsetStore实例。broadCasting广播类型,消息的offset是存放在本地的,所以实例化LocalFileOffsetStore从本地文件中加载保存的offset信息。clustering集群消费的话,消息的offset是存放在remote broker的,所以实例化的RemoteBrokerOffsetStore从远端broker获取。
    3. 根据不同的消费类型(并行消费或顺序消费),实例化不同的消息消费类ConsumeMessageOrderlyService(顺序消费)和ConsumeMessageConcurrentlyService(并行消费)。在实例化消费类型的时候,是开启了一个线程池用来处理消费消息的,这里消费消息的线程池线程数量是可以配置的,然后就是开启了定时任务线程池用来清理过期消息等。
    4. 在实例化MQClientService的时候实例化了PullMessageService,这里是拉取消息的类,在后台进行不停地拉取消息。
    public class PullMessageService extends ServiceThread {
       @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                try {
                    PullRequest pullRequest = this.pullRequestQueue.take();
                    if (pullRequest != null) {
                        this.pullMessage(pullRequest);
                    }
                } catch (InterruptedException e) {
                } catch (Exception e) {
                    log.error("Pull Message Service Run Method exception", e);
                }
            }
    
            log.info(this.getServiceName() + " service end");
        }
    }
    

      PullMessageService线程不停滴从pullRequestQueue阻塞队列中获取拉取消息请求进行拉取消息。如果拉取消息成功就继续包装PullRequest请求put到pullRequestQueue中拉取下一条消息,拉取失败就用后台定时任务线程,延迟一段时间(3秒)又把它放到pullRequestQueue中再次尝试获取消息。

    boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispathToConsume);
    

      在拉取消息成功的时候,会把获取的消息存放到processQueue队列中,这个队列是存放获取成功但是还未被消费的消息,然后是把获取到的消息封装成ConsumeRequest提交到相应的ConsumeMessageService中进行消费(并行消费或顺序消费)。

    顺序消费消息:顺序消费消息是在ConsumeMessageOrderlyService中处理的,他里面也封装了一个ConsumeRequest,把它丢到阻塞队列里面,在他的线程池里面取任务进行顺序消息的消费。

    //这里是控制顺序消费的关键
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    
    public Object fetchLockObject(final MessageQueue mq) {
            Object objLock = this.mqLockTable.get(mq);
            if (null == objLock) {
                objLock = new Object();
                Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
                if (prevLock != null) {
                    objLock = prevLock;
                }
            }
    
            return objLock;
        }
    

      在ConsumeRequest中是先去获取MessageQueue的锁,这个很关键,这个是确保顺序消息被顺序消费的关键,只有获取到了锁的线程才能去更进一步去消费消息。这里也可以看出顺序消息也只能有一个consumer才能确保全局的顺序消费,否则的话有多个consumer的话,每一个jvm进程中只能有一个MQClientInstance,这样在两个独立的jvm进程中是无法控制一个messageQueue只会同时被一个consumer消费消息。

    并行消息消费:并发消费消息是在ConsumeMessageConcurrentlyService中处理的,他里面也封装了一个ConsumeRequest,也是把它丢到阻塞队列中,后台线程池不停滴从队列中取任务进行并行消费。

    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    

      在ConsumeMessageConcurrentlyService中的ConsumeMessage会进行回调实现MessageListener的方法,即处理了应用程序消费消息的业务逻辑。

      consumer端消息过滤:在consumer端拉取到消息之后的回调中是会进行消息的过滤

    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
    
    //处理消息,主要是过滤消息
    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
            final SubscriptionData subscriptionData) {
            PullResultExt pullResultExt = (PullResultExt) pullResult;
    
            this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
            //获取到消息后的处理
            if (PullStatus.FOUND == pullResult.getPullStatus()) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
                List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
    
                List<MessageExt> msgListFilterAgain = msgList;
                if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                    //先根据tag来进行过滤
                    for (MessageExt msg : msgList) {
                        if (msg.getTags() != null) {
                            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                                msgListFilterAgain.add(msg);
                            }
                        }
                    }
                }
    
                //根据注册的FilterMessage来进行过滤
                if (this.hasHook()) {
                    FilterMessageContext filterMessageContext = new FilterMessageContext();
                    filterMessageContext.setUnitMode(unitMode);
                    filterMessageContext.setMsgList(msgListFilterAgain);
                    this.executeHook(filterMessageContext);
                }
    
                for (MessageExt msg : msgListFilterAgain) {
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                        Long.toString(pullResult.getMinOffset()));
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                        Long.toString(pullResult.getMaxOffset()));
                }
    
                pullResultExt.setMsgFoundList(msgListFilterAgain);
            }
    
            pullResultExt.setMessageBinary(null);
    
            return pullResult;
        }
    

    相关文章

      网友评论

          本文标题:rocketMq consumer消费消息

          本文链接:https://www.haomeiwen.com/subject/iwhshqtx.html