美文网首页
RocketMQ特性--事务消息【源码笔记】

RocketMQ特性--事务消息【源码笔记】

作者: 瓜农老梁 | 来源:发表于2019-08-18 11:45 被阅读0次
    目录
    一、问题思考
    二、事务消息客户端发送流程
       1.事务发送与普通启动差异
       2.事务消息发送调用链
       3.事务消息发送分析
       4.事务消息发送结果分析
       5.结束事务分析
    三、事务消息服务端存储流程
       1.事务消息存储调用链
       2.事务半消息存储代码分析(一)
       3.事务半消息存储代码分析(二)
    四、事务消息服务端响应结束事务请求
       1.处理未知类型请求
       2.处理事务提交请求
       3.处理事务回滚请求
    五、事务消息服务端状态回查   
       1.事务回查线程类调用链
       2.事务回查逻辑
       3.客户端响应事务回查
    六、事务消息交互示意图   
    
    一、问题思考

    从官方给的例子入手,代码如下:
    示例类:org.apache.rocketmq.example.transaction.TransactionProducer.java

    public static void main(String[] args) throws MQClientException, InterruptedException {
        //@1 定义TransactionListener
        TransactionListener transactionListener = new TransactionListenerImpl();
        //@2 使用事务发送Producer
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //@3 定义线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        //设置线程池
        producer.setExecutorService(executorService);
        //设置监听器
        producer.setTransactionListener(transactionListener);
        producer.setNamesrvAddr("127.0.0.1:9876");
        //@4 发送者启动
        producer.start();
    
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //@5 消息发送
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
    
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        //发送者关闭
        producer.shutdown();
    }
    

    从上面客户端例子中思考一些问题:
    1. @1定义TransactionListener做什么用?
    2. @2定义的TransactionMQProducer与普通Produer区别在哪里?
    3. @3定义线程池executorService又是干啥的?
    4. @4事务发送者启动发送流程是怎么样?
    5. 发送事务消息如何和Broker进行交互的?

    二、事务消息客户端发送流程
    1.事务发送与普通启动差异
    @1 producer.start();
    @2 TransactionMQProducer#start
       this.defaultMQProducerImpl.initTransactionEnv();
       super.start();
    @3 DefaultMQProducerImpl#initTransactionEnv()
       this.checkExecutor = producer.getExecutorService();
    

    小结:事务发送时比普通发送启动多了initTransactionEnv操作,即:给ExecutorService checkExecutor赋值。

    2.事务消息发送调用链
    @1 SendResult sendResult = producer.sendMessageInTransaction
    @2 TransactionMQProducer#sendMessageInTransaction
    @3 DefaultMQProducerImpl#sendMessageInTransaction
    
    3.事务消息发送分析

    方法:DefaultMQProducerImpl#sendMessageInTransaction

    //@1
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);
    SendResult sendResult = null;
    //@2 表示消息的prepare消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    //@3 生产者组,用于回查本地事务事,从生产者组中选择随机选择一个生产者即可
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
    //@4 消息发送
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
    

    方法:DefaultMQProducerImpl#sendKernelImpl

    //事务消息发送,设置PREPARED标记
    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }
    //@5 请求header中设置事务标记
    requestHeader.setSysFlag(sysFlag);
    //@6 发送消息请求的RequestCode
    request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
    

    小结:
    @1获取TransactionListener即示例代码传入的Listener
    @2在消息属性中加入PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"即事务半消息
    @3设置ProducerGroup Broker在事务回查时调用
    @4事务消息发送采用同步发送,发送流程与普通消息发送一致
    @5请求header中设置事务标记SEND_MESSAGE = 10

    4.事务消息发送结果分析

    方法:DefaultMQProducerImpl#sendMessageInTransaction

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    switch (sendResult.getSendStatus()) {
        //@1 
        case SEND_OK: {
        ...
        //@2 执行本地事务
        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
        ...    
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
        default:
        break;
    }
    

    小结:
    @1 发送半消息(Prepared)消息成功,设置transactionId。
    @2 发送半消息成功后,通过transactionListener回调客户端查询本地事务执行情况,并返回事务执行状态。
    LocalTransactionState有COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW三种状态。

    5.结束事务分析

    方法:DefaultMQProducerImpl#sendMessageInTransaction

    try {
        //@1 结束事务,根据返回的事务状态执行提交、回滚、暂时不处理
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }
    
    TransactionSendResult transactionSendResult = new TransactionSendResult();
    ...
    return transactionSendResult;
    

    方法:DefaultMQProducerImpl#endTransaction

    ...
    switch (localTransactionState) {
    //@2 设置事务提交标记Header
    case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
    break;
    //@3 设置事务回滚标记Header
    case ROLLBACK_MESSAGE:
    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
    break;
    //@4 设置事务未知标记Header
    case UNKNOW:
    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
        break;
    default:
        break;
    }
    //@5 通过一次发送方式向Broker提交事务   this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
    }
    

    小结:
    @1 根据本地事务执行返回的状态localTransactionState,调用结束事务方法
    @2 requestHeader设置事务提交标记0x2 << 2=8
    @3 requestHeader设置事务回滚标记0x3 << 2=12
    @4 requestHeader设置未知标记0
    @5 通过一次发送方式向Broker提交事务 RequestCode为END_TRANSACTION = 37

    三、事务消息服务端存储流程
    1.事务消息存储调用链
    @1 SendMessageProcessor#processRequest
       response = this.sendMessage(ctx, request, mqtraceContext, requestHeader)
    @2 SendMessageProcessor#sendMessage
    
    2.事务半消息存储代码分析(一)

    方法:SendMessageProcessor#sendMessage

    //@1 可以通过配置来是否接受事务消息存储
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()){
    response.setCode(ResponseCode.NO_PERMISSION);
    response.setRemark(
        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending transaction message is forbidden");
    return response;
    }
    //@2 prepare消息存储
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
    

    小结:
    @1 可以通过Broker配置属性rejectTransactionMessage来决定是否接受事务消息请求,默认为false即接受。
    @2 半消息存储

    3.事务半消息存储代码分析(二)

    方法:TransactionalMessageBridge#putHalfMessage

    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner){
        return store.putMessage(parseHalfMessageInner(messageInner));
    }
    

    方法:TransactionalMessageBridge#parseHalfMessageInner

    //@1 备份原主题
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //@2 备份原queueID
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //@3 重置sysFlag
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //@3 主题变更 RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //@4 消息队列变更为0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    

    小结:半消息在存储前将存储的主题设置为RMQ_SYS_TRANS_HALF_TOPIC,将原来的Topic备份到属性中,同时也备份了原来的QueueId。这也是为什么半消息不会被消费者消费的原因。

    四、事务消息服务端响应结束事务请求
    1.处理未知类型请求

    方法:EndTransactionProcessor#processRequest

    case MessageSysFlag.TRANSACTION_NOT_TYPE: {
        LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                + "RequestHeader: {} Remark: {}",
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            requestHeader.toString(),
            request.getRemark());
        //@1 
        return null;
    }
    

    小结:结束事务在处理未知类型TRANSACTION_NOT_TYPE时,只打印告警日志不做处理。

    2.处理事务提交请求

    半消息查找。方法:EndTransactionProcessor#processRequest

     if (MessageSysFlag.TRANSACTION_COMMIT_TYPE 
        == requestHeader.getCommitOrRollback()){
    //@1 将prepare消息找出来
    result = this.brokerController.getTransactionalMessageService()
    .commitMessage(requestHeader);
    ...
    //@2 
    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
    
    }
    

    半消息还原。方法:EndTransactionProcessor#endMessageTransaction

    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    //@3 置换为原来的Topic
    msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    //@4 置换为原来的QueueId
    msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    ...
    //清除属性设置
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
    return msgInner;
    

    小结:
    @1 根据偏移量将半消息查找出来
    @2 将存储在RMQ_SYS_TRANS_HALF_TOPIC还原
    @3 置换为原来的Topic
    @4 置换为原来的QueueId

    还原后消息存储。方法:EndTransactionProcessor#processRequest

    //@1 新组装的消息存储(提交)
    RemotingCommand sendResult = sendFinalMessage(msgInner); 
    if (sendResult.getCode() == ResponseCode.SUCCESS) {
    //@2 删除prepare消息 是将消息存储于RMQ_SYS_TRANS_OP_HALF_TOPIC中
    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    }
    

    小结:
    @1 将还原后的消息存储
    @2 删除半消息消息

    半消息删除。方法:TransactionalMessageServiceImpl#putOpMessage

    public boolean putOpMessage(MessageExt messageExt, String opType) {
    MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
        this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
    if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
        return addRemoveTagInTransactionOp(messageExt, messageQueue);
    }
    return true;
    }
    
    private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
    //@1 主题变更为RMQ_SYS_TRANS_OP_HALF_TOPIC
    Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,      String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
    //@2 存储消息
    writeOp(message, messageQueue);
    return true;
    }
    

    小结:半消息的删除是将Topic从RMQ_SYS_TRANS_HALF_TOPIC变更为RMQ_SYS_TRANS_OP_HALF_TOPIC存储到日志文件,依靠文件删除机制删除。

    3.处理事务回滚请求

    方法:EndTransactionProcessor#processRequest

    //@1 查找半消息
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
    if (res.getCode() == ResponseCode.SUCCESS) {
    //@2 删除prepare消息 是将消息存储于RMQ_SYS_TRANS_OP_HALF_TOPIC中      this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    }
    return res;
    }
    }
    

    小结:处理事务回滚请求,将半消息查找出来,将其删除即:将Topic从RMQ_SYS_TRANS_HALF_TOPIC变更为RMQ_SYS_TRANS_OP_HALF_TOPIC并存储,依靠文件删除机制删除。

    五、事务消息服务端状态回查
    1.事务回查线程类调用链

    线程类初始化:TransactionalMessageCheckService

    @1 main(String[] args)
    start(createBrokerController(args));
    @2 createBrokerController
    @3 initialize()
    @4 initialTransaction()
    this.transactionalMessageCheckService = 
    new TransactionalMessageCheckService(this);
    

    小结:在Broker初始化启动时,TransactionalMessageCheckService线程类也随着启动初始化。

    2.事务回查逻辑

    方法:TransactionalMessageCheckService#run

    //@1 时间间隔为60秒
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    

    方法:TransactionalMessageCheckService#onWaitEnd

    //@2 transactionTimeOut默认6秒
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    //@3 最大核查次数为15次
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    

    小结:事务回查每隔60秒执行一次,一次执行超时时间为6秒,最大回查次数为15次。

    回查逻辑(一)。方法:TransactionalMessageServiceImpl#check

    //@1 
    Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
    
    for (MessageQueue messageQueue : msgQueues) {
        //获取对应的RMQ_SYS_TRANS_OP_HALF_TOPIC中的队列
        MessageQueue opQueue = getOpQueue(messageQueue); 
        //半消息消费队列中偏移量
        long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); 
        //OP已删除消费队列中的偏移量
        long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); 
    }
    //@2 
    PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);              
    

    方法:TransactionalMessageServiceImpl#fillOpRemoveMap

    //@3 
    PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
    List<MessageExt> opMsg = pullResult.getMsgFoundList();
    for (MessageExt opMessageExt : opMsg) {
        Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));    
        if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
           //已经处理过的消息即commit和rollback
           if (queueOffset < miniOffset) {
                doneOpOffset.add(opMessageExt.getQueueOffset()); 
            } else {
                //已经处理删除过了,但是半消息还没有更新
                removeMap.put(queueOffset, opMessageExt.getQueueOffset());
            }
        } else {
            log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
        }
    }
    

    小结:
    @1 从半消息队列中查找消息队列
    @2 opQueue队列中的消息均为已经删除的半消息,需要检查下是否已经删除了,当时半消息队列还没有更新。
    @3 miniOffset为半消息消费队列中的最大偏移量;queueOffset为删除消费队列的消息偏移量;通过比较两者来确定是否已经删除了,而半消息状态还没有更新,并将这类消息存储在removeMap中。

    回查逻辑(二)。方法:TransactionalMessageServiceImpl#check

     while (true) {
        //查找消息
        GetResult getResult = getHalfMsg(messageQueue, i);
        //@1 已经处理过了,半消息滞后了,偏移量继续递增往下走
        if (removeMap.containsKey(i)) { 
            log.info("Half offset {} has been committed/rolled back", i);
            removeMap.remove(i);
        }
        //@2 needSkip 超过存储时间(默认3天) needDiscard 超过回查次数,默认15次
        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
            listener.resolveDiscardMsg(msgExt);
            newOffset = i + 1;
            i++;
            continue;
        }
        //@3 消息存储时间大于开始时间暂不处理
        if (msgExt.getStoreTimestamp() >= startTime) {
           break;
        }
        //@4 存储的时间小于需要回查的时间 跳过
        if (valueOfCurrentMinusBorn < checkImmunityTime) { 
            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                newOffset = i + 1;
                i++;
                continue;
            }
        }
        
        //接着往下处理
        newOffset = i + 1; 
        i++;   
    
     }
    

    小结
    @1 removeMap(即已删除队列有而半消息队列未更新的消息)有则不在处理跳过该消息。
    @2 超过存储时间或者回查次数超过15次不再处理
    @3 消息存储时间大于核查程序开始时间暂不处理
    @4 如果定义了回查的时间间隔需要判断是否到时间了

    回查逻辑(三)。方法:TransactionalMessageServiceImpl#check

    if (isNeedCheck) {
        //@1 
        if (!putBackHalfMsgQueue(msgExt, i)) {
            continue;
        }
        //@2 
        listener.resolveHalfMsg(msgExt);
    }} else {
      
    }
    //@3 
    //保存prepare消息队列的回查消费进度
    if (newOffset != halfOffset) { 
        transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
    }
    long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
    //保存OP消费进度
    if (newOpOffset != opOffset) { 
        transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
    }
    

    小结:
    @1 将半消息重新存储在RMQ_SYS_TRANS_HALF_TOPIC中,由于本次回查尚未知道结果,所以进行存储。
    @2 发到客户端进行回查,回查的RequestCode为CHECK_TRANSACTION_STATE = 39,根据ProductGroup随机获取客户端通道Channel进行回查。
    @3 保存半消息和已处理消息的消费进度。

    3.客户端响应事务回查

    方法:ClientRemotingProcessor#checkTransactionState

    producer.checkTransactionState(addr, messageExt, requestHeader);
    

    方法:MQProducerInner#checkTransactionState

    //@1
    localTransactionState = transactionListener.checkLocalTransaction(message);
    //@2 
    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                            3000);
    

    小结:
    @1 执行本地事务回查并返回事务回查状态
    @2 将事务回查状态提交到Broker

    六、事务消息交互示意图
    事务消息交互 .png

    作者老梁,哈啰出行高级技术专家,参与了《RocketMQ技术内幕》审稿工作。专注后端中间件方向,已陆续发表RocketMQ系列、Kafka系列、gRPC系列、Sentinel系列、Java NIO系列。其中RocketMQ系列已发表40余篇。源码、实战、原理、调优期待与你一起学习。


    公众号.jpg
    个人微信.png

    相关文章

      网友评论

          本文标题:RocketMQ特性--事务消息【源码笔记】

          本文链接:https://www.haomeiwen.com/subject/fyrrsctx.html