美文网首页
rocket源码 顺序消息和事务消息

rocket源码 顺序消息和事务消息

作者: Ace_b90f | 来源:发表于2020-09-15 13:10 被阅读0次
    顺序消息的实现

    顺序消息进行消费时,若是第一次消费失败,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT,下一次会继续消费此消息。

    顺序消息的消费失败时的重试逻辑,具体代码在ProccessQueue中,顺序消费时手动从processQueue中取消息,内部是从msgTreeMap中取出消息后,将消息添加到consumingMsgOrderlyTreeMap中,若是消费成功,将该消息从consumingMsgOrderlyTreeMap中删除即可。若是消费失败,执行makeMessageToConsumeAgain方法,将这些消息再放回msgTreeMap。

    顺序消费时有回滚和重试的逻辑,但是新版本不建议使用。回滚和重试的逻辑和上面相同,回滚时将消息重新放回treeMap,提交时不用操作treeMap,但是需要根据consumingMsgOrderlyTreeMap找到当前消费的offset,从下一个继续消费。

    顺序消息消费时使用同一个线程,可以看一下ConsumeMessageOrderlyService

    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    this.consumeExecutor = new ThreadPoolExecutor(
                this.defaultMQPushConsumer.getConsumeThreadMin(),
                this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代码...
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.consumeRequestQueue,
                new ThreadFactoryImpl("ConsumeMessageThread_"));
    

    因为queue的长度是Integer.MAX_VALUE,因此在进行消费时使用的是一个线程,并且有序执行。

    顺序消息的消费使用同一个线程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中实现的。

    // ProcessQueue
    
    private volatile boolean consuming = false;
    
    
        public boolean putMessage(final List<MessageExt> msgs) {
            boolean dispatchToConsume = false;
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    int validMsgCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                        if (null == old) {
                            validMsgCnt++;
                            this.queueOffsetMax = msg.getQueueOffset();
                            msgSize.addAndGet(msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(validMsgCnt);
                    // 如果有消息可以进行消费,并且当前queue没有消费,则将dispatchToConsume和consuming置为true
                    if (!msgTreeMap.isEmpty() && !this.consuming) {
                        dispatchToConsume = true;
                        this.consuming = true;
                    }
    
                    if (!msgs.isEmpty()) {
                        MessageExt messageExt = msgs.get(msgs.size() - 1);
                        String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                        if (property != null) {
                            long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                            if (accTotal > 0) {
                                this.msgAccCnt = accTotal;
                            }
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("putMessage exception", e);
            }
    
            return dispatchToConsume;
        }
    
    // ConsumeMessageOrderlyService
    
        public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispathToConsume) {
            if (dispathToConsume) { // putMessage返回true时,才将request提交到线程池
            // 如果已经开始对该queue进行消费了,就不会再次提交任务
                ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
                this.consumeExecutor.submit(consumeRequest);
            }
        }
    
    // 提交给线程池的任务
    // 主要代码
        class ConsumeRequest implements Runnable {
    
            @Override
            public void run() {
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {
                    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                        // 如果可以继续消费,直接在当前线程中轮询消费该ProcessQueue即可
                        for (boolean continueConsume = true; continueConsume; ) {
                            // 在consumerImpl中的pullMessage方法中持续给ProcessQueue添加消息
                            // 手动从ProcessQueue中取消息
                            List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                            if (!msgs.isEmpty()) {
                                try {
                                    this.processQueue.getLockConsume().lock();
                                    //消费消息
                                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                                } catch (Throwable e) {
                                   
                                } finally {
                                    this.processQueue.getLockConsume().unlock();
                                }
                                // 处理消费结果,若是成功继续消费
                                continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                            } else {
                                continueConsume = false;
                            }
                        }
                    } 
            }
    

    看代码可以发现,如果顺序消息消费失败的话,即消费返回SUSPEND_CURRENT_QUEUE_A_MONENT时,当前线程会停止消费,在processConsumeResult时,会提交新的任务到线程池,在新的线程中继续消费该消息。

    核心逻辑是保证一个ProcessQueue只在一个线程中轮询消费消息。

    发送顺序消息时会添加一个队列选择器,将需要有序的消息发送到同一个队列。消费端拉取特定queue的数据时天生有序,在消费时使用同一个线程进行消费,因此就实现了顺序消息。

    事务消息

    二阶段提交加补偿机制

    第一阶段提交消息到broker,broker将topic修改为RMQ_SYS_TRANS_HALF_TOPIC,存入对consumer不可见的topic/queue。如果此阶段写入成功,执行transactionListener.executeLocalTransaction()

    第二阶段,根据本地事务的执行结果提交或者回滚第一阶段提交至broker的消息,这里使用的是OneWay方法,可靠性低,可能出现失败或者超时的情况。

    broker端处理RequestCode.END_TRANSACTION的请求,如果是commit,则将原来的消息取出,更改为正确的topic/queue,并进行落盘,然后添加Op状态。如果是rollback,则直接添加Op状态即可。

    添加Op状态是将消息添加到Op队列中,Op队列是为了补偿逻辑时减少判断。

    补偿逻辑:

    BrokerController启动时会启动TransactionMessageCheckService,默认每隔60s检查一次HALF_TOPIC下所有的queue中的消息,检查步骤如下

    • 先判断当前queue和对应的opQueue是否添加过消息,如果没有,遍历下一个queue,若有,进行下一步判断
    • 获取对应的opQueue中的消息,若是没有消息,遍历下一个queue,若有,进行下一步判断
    • 遍历当前queue
    • 如果当前偏移量已经添加了oP状态,直接遍历至下一个偏移量,否则进行下一步判断
    • 获取当前消息,若为null,遍历下一个偏移量,若不为null,进行下一步判断
    • 若当前消息需要舍弃或者跳过,遍历下一个偏移量,否则进行下一步判断
    • 判断当前消息是否需要check,若暂时不需要,重新走判断流程
    • 若是需要check,broker端给producer发送CHECK_TRANSACTION_STATE消息,producer端接收到消息后,执行TransactionListener.checkLocalTransaction,将check结果回发给broker。

    相关文章

      网友评论

          本文标题:rocket源码 顺序消息和事务消息

          本文链接:https://www.haomeiwen.com/subject/ckvtyktx.html