美文网首页spring-cloud
rocketmq如何实现分布式事务

rocketmq如何实现分布式事务

作者: 时之令 | 来源:发表于2018-07-27 19:19 被阅读2993次

    如果同一个数据源在本地事物很好控制,但是在不断发展的互联网环境下,微服务越来越流行,这个时候,需要解决分布式事物,需要保证数据的最终一致性。
    所谓分布式事务(全局事物),就是在两个不同的系统中(即两个不同的数据源,其实同一个数据源也行),无法同一个同一个spring事物去控制不同系统的事物的整体成功或者整体失败。

    分布式事务的解决方法很多,但是性能和复杂性不一样,今天说说rocketmq如何保证分布式事务的。

    rocketmq为我们提供TransactionMQProducer API的支持。这个api在发送消息的时候主要做了三件事:
    1,先发送需要发送的消息到消息中间件broker,并获取到该message的transactionId。在第一次发送的时候,该消息的状态为LocalTransactionState.UNKNOW
    2,处理本地事物。
    3,根据本地事物的执行结果,结合transactionId,找到该消息的位置,在mq中标志该消息的最终处理结果。
    整体的执行源码逻辑如下:

    
    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
            if (null == tranExecuter) {
                throw new MQClientException("tranExecutor is null", (Throwable)null);
            } else {
                Validators.checkMessage(msg, this.defaultMQProducer);
                SendResult sendResult = null;
                MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
                MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
    
                try {
                    //这里执行第一次发送消息,也就是预发送,并获取sendResult,这里包含msg的所有消息
                    sendResult = this.send(msg);
                } catch (Exception var10) {
                    throw new MQClientException("send message Exception", var10);
                }
    
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable localException = null;
                //根据预发送消息的状态做不同的处理,这里主要看SEND_OK
                switch(sendResult.getSendStatus()) {
                case SEND_OK:
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
    
    // 这里做第二步,执行业务逻辑,即本地事物,
    //具体的本地事物在LocalTransactionExecuter参数的实现类中,
    //需要根据自己的业务逻辑去写,下面的//tranExecuter.executeLocalTransactionBranch(msg, arg);会执行实
    //现类中的executeLocalTransactionBranch业务。
                        localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            this.log.info(msg.toString());
                        }
                    } catch (Throwable var9) {
                        this.log.info("executeLocalTransactionBranch exception", var9);
                        this.log.info(msg.toString());
                        localException = var9;
                    }
                    break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                }
    
                try {
    // 这里的方法,其中的localTransactionState是第二次执行业务逻辑的结果
    //可以根据这个结果,知道本地事物执行的成功还是失败。或者是异常localException,
    //这样可以根据第一次发送消息的结果sendResult,去修改mq中第一次发送消息的状态,完成第三步操作。
                    this.endTransaction(sendResult, localTransactionState, localException);
                } catch (Exception var8) {
                    this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
                }
    
                TransactionSendResult transactionSendResult = new TransactionSendResult();
                transactionSendResult.setSendStatus(sendResult.getSendStatus());
                transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
                transactionSendResult.setMsgId(sendResult.getMsgId());
                transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
                transactionSendResult.setTransactionId(sendResult.getTransactionId());
                transactionSendResult.setLocalTransactionState(localTransactionState);
                return transactionSendResult;
            }
        }
    
    

    上述第三步执行的方法endTransaction,逻辑如下:

    
     public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    
    // 获取第一次发送消息的id
            MessageId id;
            if (sendResult.getOffsetMsgId() != null) {
                id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
            } else {
                id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
            }
    
        //获取事物id
            String transactionId = sendResult.getTransactionId();
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
            EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
            requestHeader.setTransactionId(transactionId);
            requestHeader.setCommitLogOffset(id.getOffset());
    
           //根据本地事物执行状态localTransactionState,告知mq修改状态
    
            switch(localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(Integer.valueOf(8));
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(Integer.valueOf(12));
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(Integer.valueOf(0));
            }
    
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
            requestHeader.setMsgId(sendResult.getMsgId());
            String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
    //具体执行第三步完成整个事务。      
      this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
        }
    
    

    上述:
    如果第三阶段出现异常或者网络原因,就是本地事务执行成功持久化到数据库中,但是在修改mq中消息状态出现异常的时候,这样就可以出现本地和mq的消息状态的不一致问题。或者说,所有的数据不一致问题。rocketmq都会定期通过TransactionMQProducer API初始化的时候,设置的TransactionCheckListener的的实现类的checkLocalTransactionState 方法检查本地消息的状态,根据本地状态修改mq的状态

    package org.apache.rocketmq.client.producer;
    
    import org.apache.rocketmq.common.message.MessageExt;
    
    public interface TransactionCheckListener {
        LocalTransactionState checkLocalTransactionState(MessageExt var1);
    }
    
    

    这样就可以确保生产端的数据要么是本地事物执行失败,rollback mq中消息,或者本地事物执行成功,mq中的消息也是待消费的状态。

    说到这里,其实分布式事务已经完成90%。如果生产端的的第三部不是成功状态,消费端是不会消费第一阶段的消息的。所以,只要消费端消费的消息,肯定是生产端成功的消息,而消费端只要保证幂等性,就可以准确的消费消息(因为消费失败了,mq是可以重试,如果最总都没有被消费,基本是程序有bug,可以事后人工处理了。)

    相关文章

      网友评论

        本文标题:rocketmq如何实现分布式事务

        本文链接:https://www.haomeiwen.com/subject/otwgmftx.html