美文网首页消息中间件选型
RocketMQ源码解析(十三)-事务消息

RocketMQ源码解析(十三)-事务消息

作者: 空挡 | 来源:发表于2019-03-09 22:46 被阅读0次

    什么是事务消息

    首先我们用一个场景来讲一下事务消息解决的问题。分布式消息队列多用来解决多个微服务之间的调用解耦,不会因为单个服务的服务质量问题而影响其它业务。比如电商场景下,一笔订单支付成功后可能要通知多个系统,如erp系统准备发货、商品系统扣减库存等,这中间如果使用消息队列来解决的话会有2种情况:
    1)先发送消息,后更新订单状态
    2)先更新订单状态,后发送消息
    在第一种情况下,如果订单更新时出现问题发生回滚,消息已经发送出去了,下游系统可能会出错。
    如果采用第二种方案,如果订单更新后,发送消息前因为系统宕机导致消息没发出去,则下游系统就不知道订单的最新状态。
    RocketMQ的事务消息就是为了解决上面的问题,它将消息和订单状态更新这2步操作放到一个事务中,要么都成功,要么都失败。下面看下它的实现原理

    RocketMQ事务消息的实现原理

    首先事务消息中用到的几个概念需要明确一下:
    本地事务,用户实现的业务逻辑,比如上面例子中的更新订单状态的逻辑,本地事务执行返回的结果可能有3种状态,1)COMMIT,代表本地业务逻辑执行成功,这种情况下消息应该发出;2)ROLLBACK,本地业务逻辑执行失败,消息不应该发;3)UNKNOWN,未知状态,可能是事务正在执行中出异常等,这种情况下消息系统不知道该如何处理,当前的逻辑是会直接丢弃掉,等待后续检查逻辑来处理。
    Prepared消息
    RocketMQ在执行本地事务之前会先发一条Prepared消息到Broker,声明事务开始,但Prepared消息不会发给Consumer。
    Commit/Rollback消息
    在本地事务执行结束后,会根据本地事务的状态来决定发送Commit/Rollback消息,用于结束事务。Broker收到这条消息后会把之前的Prepared消息真正投递给Consumer。
    下面看下事务消息交互流程,这里直接引用阿里云文档的图:

    事务消息交互流程
    1. 这里面的半消息(Half消息)即Prepared消息
    2. 第4步即发送给BrokerCommit/Rollback消息
    3. 在最新的开源版本的RocketMQ中,第5步的动作并没有启用,所以当前的开源版本如果第4步的时候失败,则这个事务就永远处于Prepared状态直到被删除。RocketMQ还在对这个功能做优化,后续应该会上新的实现版本。

    代码实现

    现在我们看下事务消息的代码实现,首先按惯例引用官方文档的demo:

    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionListener transactionListener = new TransactionListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
            //1、设置处理回查请求的executor
            producer.setExecutorService(executorService);
           //2、设置本地事务Listener
            producer.setTransactionListener(transactionListener);
            producer.start();
    
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    //3、发送消息
                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }
    

    从上面的代码可以看出事务消息跟普通消息使用不同的Producer
    第1步,设置的executor用来处理Broker的回查请求,因为这个功能现在已经去掉了,所以这个executor其实是没用的。
    第2步,设置的Listener中,用户需要实现两个方法

    public interface TransactionListener {
        /**
         * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
         *
         * @param msg Half(prepare) message
         * @param arg Custom business parameter
         * @return Transaction state
         */
        LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    
        /**
         * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
         * method will be invoked to get local transaction status.
         *
         * @param msg Check message
         * @return Transaction state
         */
        LocalTransactionState checkLocalTransaction(final MessageExt msg);
    }
    

    第1个方法就是本地事务的实现,业务代码写在这个方法里面
    第2个方法是Broker回查消息状态的时候调用的方法,因为回查功能已经没有了,所以这个方法暂时也用不到
    下面我们看下事务Producer的代码实现

    事务消息Producer

    TransactionMQProducerDefaultMQProducer继承,所以大体逻辑和普通的Producer是一样的,除了start()方法中加了针对事务消息的初始化逻辑:

        @Override
        public void start() throws MQClientException {
            //如果Producer未设置Executor,则默认初始化一个
            this.defaultMQProducerImpl.initTransactionEnv();
            super.start();
        }
    

    start第一步就是检查下用户有没有设置executor,如果没有则默认初始化,然后就调用DefaultMQProducerImplstart()方法了,这里和普通消息没有什么区别。
    消息发送
    事务消息发送调用的DefaultMQProducerImpl.sendMessageInTransaction()方法

    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                              final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
            //1、检查TransactionListener是否存在
            TransactionListener transactionListener = getCheckListener();
            if (null == localTransactionExecuter && null == transactionListener) {
                throw new MQClientException("tranExecutor is null", null);
            }
            //2、消息校验,校验topic和body长度
            Validators.checkMessage(msg, this.defaultMQProducer);
            //3、设置消息的事务属性,为PREPARED消息
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
            try {
                //4、发送消息,和发送普通消息调用同一个方法
                sendResult = this.send(msg);
            } catch (Exception e) {
                throw new MQClientException("send message Exception", e);
            }
    
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            switch (sendResult.getSendStatus()) {
                case SEND_OK: {
                    try {
                        //5、当前Broker不会返回这个值
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
                        //6、使用客户端生成的唯一id作为事务ID
                        String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }
                        if (null != localTransactionExecuter) {//默认为空
                            localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                        } else if (transactionListener != null) {
                            log.debug("Used new transaction API");
                            //7、消息发送成功,调用transactionListener执行本地事务
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            log.info(msg.toString());
                        }
                    } catch (Throwable e) {
                        log.info("executeLocalTransactionBranch exception", e);
                        log.info(msg.toString());
                        localException = e;
                    }
                }
                break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    //8、消息持久化失败,则事务回滚
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                    break;
                default:
                    break;
            }
    
            try {
                //9、发送结束事务消息(Commit/Rollback)
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception e) {
                log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
            }
    
            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }
    

    第3步,Prepared消息会在消息的自定义属性中添加标识,包含消息类型和发送的ProducerGroup
    第4步,提交消息到Broker方法和普通消息调用的是同一个,实现中唯一针对事务消息的修改就是设置了消息的sysFlag,在sendKernelImpl()方法中:

    //如果是Prepared消息,设置sysFlag
     final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
          sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }
    

    第7步,消息发送成功,则回调TransactionListener的实现,执行本地事务,得到本地事务的执行状态。
    第8步,如果第4步中prepared消息虽然发送成功,但Broker持久化消息失败,本地事务不会执行,直接回滚
    第9步,根据本地事务的执行状态,发送Commit/Rollback消息给Broker,我们看下具体实现:

    public void endTransaction(
            final SendResult sendResult,
            final LocalTransactionState localTransactionState,
            final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
            final MessageId id;
            if (sendResult.getOffsetMsgId() != null) {
                id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
            } else {
                id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
            }
            String transactionId = sendResult.getTransactionId();
            //1、获取接收prepared消息的Broker地址
            final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
            EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
            requestHeader.setTransactionId(transactionId);
            //2、消息在commitLog的offset
            requestHeader.setCommitLogOffset(id.getOffset());
            //3、根据本地执行结果设置提交或回滚
            switch (localTransactionState) {
                case COMMIT_MESSAGE:
                    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                case ROLLBACK_MESSAGE:
                    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    break;
                case UNKNOW:
                    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    break;
                default:
                    break;
            }
    
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            //4、设置消息在broker上的queueOffset
            requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
            requestHeader.setMsgId(sendResult.getMsgId());
            String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
            //5、发送结束事务消息RequestCode.END_TRANSACTION,Oneway
            this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                this.defaultMQProducer.getSendMsgTimeout());
        }
    

    第1步,获取接收prepared消息的那个broker地址,两个消息必须发到同一个broker
    第2,4步,commit/rollback消息需要携带原prepared消息的commitLog offsetqueue offset
    第5步,最后消息是用Oneway的方式提交的,也就是Broker处理无论成功还是失败,Producer不会再做处理。这里之所以是这个逻辑,是因为RocketMQ之前的版本是有回查逻辑的,当前最新版本把这个逻辑去掉后,确实大大影响了事务消息的可用性。

    Broker处理Prepared消息

    Broker处理Prepared消息是和普通消息用的同一个SendMessageProcessor,所以我们之看下针对事务消息的特殊处理逻辑。

    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                            final RemotingCommand request,
                                            final SendMessageContext sendMessageContext,
                                            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
            ...
            ...
            String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (traFlag != null && Boolean.parseBoolean(traFlag)) {
                //如果是事务消息,判断broker是否支持事务消息
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
                    return response;
                }
                //存储prepare消息
                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
            } else {
                //8、调用MessageStore接口存储消息
                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
            }
           return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
    }
    

    Processor的代码中可以发现,针对Prepared消息是用的TransactionalMessageService来处理的,最终还是跟普通消息一样调用的MessageStore的方法来存储消息到CommitLog,但是在存储之前对消息数据做了转换 :

        //TransactionalMessageServiceImpl.prepareMessage()
        @Override
        public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
            return transactionalMessageBridge.putHalfMessage(messageInner);
        }
    
        //TransactionalMessageBridge.java
        public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
            return store.putMessage(parseHalfMessageInner(messageInner));
        }
    
        private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
            MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
            MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                String.valueOf(msgInner.getQueueId()));
            //清除sysFlag中的事务消息状态位
            msgInner.setSysFlag(
                MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
            //事务prepare消息放入统一的topic,RMQ_SYS_TRANS_HALF_TOPIC
            msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
            //queueId统一设置成0
            msgInner.setQueueId(0);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
            return msgInner;
        }
    

    以上的代码可以看到,在将消息存到MessageStore之前,会将原始的TopicqueueId放入自定义属性中,然后将sysFlag设置成非事务消息,topic统一改成RMQ_SYS_TRANS_HALF_TOPIC,queueId设置为0。这样所有的Prepared消息都会发到同一个topic的同一个queue下面。而且因为这个topic是系统内置的,consumer不会订阅这个topic的消息,所以Prepared的消息是不会被Consumer收到的。

    Broker处理Commit/Rollback消息

    Broker使用一个专门的EndTransactionProcessor来处理Commit/Rollback 消息,逻辑如下:

    @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
            RemotingCommandException {
             ...
             ...
            //判断是来源于Producer主动发的消息还是Broker主动检查返回的消息,这里只用来记录日志
            if (requestHeader.getFromTransactionCheck()) {
                //log
            } else {
                //log
            }
            OperationResult result = new OperationResult();
           //1、如果收到的是提交事务消息
            if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
                //2、从commitLog中查出原始的prepared消息
                result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
                if (result.getResponseCode() == ResponseCode.SUCCESS) {
                   //3、检查获取到的消息是否和当前消息匹配(包括ProduceGroup、queueOffset、commitLogOffset)
                    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                    if (res.getCode() == ResponseCode.SUCCESS) {
                        //4、使用原始的prepared消息属性,构建最终发给consumer的消息
                        MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                        msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                        msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                        msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                        msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                        //5、调用MessageStore的消息存储接口提交消息,使用真正的topic和queueId
                        RemotingCommand sendResult = sendFinalMessage(msgInner);
                        if (sendResult.getCode() == ResponseCode.SUCCESS) {
                       //6、设置Prepared消息的标记位为delete
                          this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                        }
                        return sendResult;
                    }
                    return res;
                }
            } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
                //7、收到的回滚事务消息
                result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
                if (result.getResponseCode() == ResponseCode.SUCCESS) {
                    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                    if (res.getCode() == ResponseCode.SUCCESS) {
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return res;
                }
            }
            response.setCode(result.getResponseCode());
            response.setRemark(result.getResponseRemark());
            return response;
        }
    
    1. 当收到Commit消息时,Broker会根据消息中携带的offset信息去CommitLog中查出原来的Prepared消息,这也就是为什么Producer在发送最终的Commit消息的时候一定要指定是同一个Broker。消息查到后按照原来的topic和queueId,生成一条新的消息重新存到MessageStore,这样这条消息就跟普通消息一样,被Consumer收到了。
      这里第6步需要注意下,消息Commit后,理论上需要将原来的Prepared消息删除,这样Broker就能知道哪些消息一直没收到Commit/Rollback,需要去Producer回查状态。但是如果直接修改CommitLog文件,这个代价是很大的,所以RocketMQ是通过生成一个新的delete消息来标记的。这样,Broker在检查的时候只需要看下Prepared消息有没有对应的delete`消息就可以了。具体代码如下:
        @Override
        public boolean deletePrepareMessage(MessageExt msgExt) {
            if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
                log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
                return true;
            } else {
                log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
                return false;
            }
        }
        
         public boolean putOpMessage(MessageExt messageExt, String opType) {
            //选择和Prepared消息相同的queueId
            MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
                this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
            if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
                return addRemoveTagInTransactionOp(messageExt, messageQueue);
            }
            return true;
        }
        private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
            //message的topic为RMQ_SYS_TRANS_OP_HALF_TOPIC
            //消息的tags值是d,body中存储的是prepared消息的queueOffset
            Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
                String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
            writeOp(message, messageQueue);
            return true;
        }
    
    1. 当收到Rollback事务消息,则不需要重新生成新消息发送,只需要将原来的消息标记位置成delete就可以了。

    总结

    事务消息通过2次消息确认和Producer回调用户本地事务,来解决用户业务逻辑和消息发送的原子性问题。当前版本中事务消息因为性能问题取消了Broker对长时间未delete的Prepared消息的状态回查,导致事务消息的高可用有所降低。如果要使用事务消息需要等待后期版本更新,或者用户自己实现回查逻辑。

    相关文章

      网友评论

        本文标题:RocketMQ源码解析(十三)-事务消息

        本文链接:https://www.haomeiwen.com/subject/sxufpqtx.html