美文网首页MQ
RocketMQ 与 Spring Boot整合(七、事务消息)

RocketMQ 与 Spring Boot整合(七、事务消息)

作者: 梅西爱骑车 | 来源:发表于2020-08-19 10:24 被阅读0次

在分布式消息队列中,目前唯一提供完整的事务消息的,只有 RocketMQ 。关于这一点,还是可以鼓吹下的。

可能会有人说,RabbitMQ 和 Kafka 也有事务消息啊,也支持发送事务消息的发送,以及后续的事务消息的 commit提交或 rollbackc 回滚。但是要考虑一个极端的情况,在本地数据库事务已经提交的时时候,如果因为网络原因,又或者崩溃等等意外,导致事务消息没有被 commit ,最终导致这条事务消息丢失,分布式事务出现问题。

相比来说,RocketMQ 提供事务回查机制,如果应用超过一定时长未 commit 或 rollback 这条事务消息,RocketMQ 会主动回查应用,询问这条事务消息是 commit 还是 rollback ,从而实现事务消息的状态最终能够被 commit 或是 rollback ,达到最终事务的一致性。
虽然说 RabbitMQ、Kafka 并未提供完整的事务消息,但是社区里,已经基于它们之上拓展,提供了事务回查的功能。例如说:Myth ,采用消息队列解决分布式事务的开源框架, 基于 Java 语言来开发(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架进行分布式事务。
下面,我们开始本小节的示例。

7.1 Demo07Message

创建 [Demo07Message]消息类,提供给当前示例使用。代码如下:

package com.ebadagang.springboot.rocketmq.message;

/**
 * 示例 07 的 Message 消息
 */
public class Demo07Message {

    public static final String TOPIC = "DEMO_07";

    /**
     * 编号
     */
    private Integer id;

    public Demo07Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo07Message{" +
                "id=" + id +
                '}';
    }

}

7.2 Demo07Producer

创建 [Demo07Producer]类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现发送事务消息。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.message.Demo07Message;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class Demo07Producer {

    private static final String TX_PRODUCER_GROUP = "demo07-producer-group";

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public TransactionSendResult sendMessageInTransaction(Integer id) {
        // <1>创建 Demo07Message 消息
        Message<Demo07Message> message = MessageBuilder.withPayload(new Demo07Message().setId(id))
                .build();
        // <2>发送事务消息
        return rocketMQTemplate.sendMessageInTransaction(Demo07Message.TOPIC, message,
                id);
    }

    @RocketMQTransactionListener()
    public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

        private Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return rollback, commit or unknown
            logger.info("[executeLocalTransaction][执行本地事务,消息:{} arg:{}]", msg, arg);
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return rollback, commit or unknown
            logger.info("[checkLocalTransaction][回查消息:{}]", msg);
            return RocketMQLocalTransactionState.COMMIT;
        }

    }

}

<1> 处,创建内容为 Demo07Message 的 Spring Messaging Message 消息。
<2> 处,调用 RocketMQTemplate#sendMessageInTransaction(...) 方法,发送事务消息。我们来看看该方法的方法参数,代码如下:

// RocketMQTemplate.java

/**
 * Send Spring Message in Transaction
 *
 * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
 * @param destination     destination formats: `topicName:tags`
 * @param message         message {@link org.springframework.messaging.Message}
 * @param arg             ext arg
 * @return TransactionSendResult
 * @throws MessagingException
 */
public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
    final Message<?> message, final Object arg) throws MessagingException {
    try {
        TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
        org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
        return txProducer.sendMessageInTransaction(rocketMsg, arg);
    } catch (MQClientException e) {
        throw RocketMQUtil.convert(e);
    }
}
  • 方法参数 txProducerGroup ,事务消息的生产者分组。因为 RocketMQ 是回查(请求)指定指定生产分组下的 Producer ,从而获得事务消息的状态,所以一定要正确设置。这里,我们设置了 "demo07-producer-group" 。
  • 方法参数 destination ,消息的 Topic + Tag 。
  • 方法参数 message ,消息,没什么特别。
  • 方法参数 arg ,后续我们调用本地事务方法的时候,会传入该 arg 。如果要传递多个方法参数给本地事务的方法,可以通过数组,例如说 Object[]{arg1, arg2, arg3} 这样的形式。

在 Demo07Producer 类中,创建一个内部类 TransactionListenerImpl ,实现 MQ 事务的监听。代码如下:

// Demo07Producer.java

@RocketMQTransactionListener(txProducerGroup = TX_PRODUCER_GROUP)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // ... local transaction process, return rollback, commit or unknown
        logger.info("[executeLocalTransaction][执行本地事务,消息:{} arg:{}]", msg, arg);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // ... check transaction status and return rollback, commit or unknown
        logger.info("[checkLocalTransaction][回查消息:{}]", msg);
        return RocketMQLocalTransactionState.COMMIT;
    }
  • 在类上,添加 @RocketMQTransactionListener 注解,声明监听器的是生产者分组是 "demo07-producer-group"的 Producer 发送的事务消息。
  • 实现 RocketMQLocalTransactionListener 接口,实现执行本地事务和检查本地事务的方法。
  • 实现 #executeLocalTransaction(...) 方法,实现执行本地事务。
    • 注意,这是一个模板方法。在调用这个方法之前,RocketMQTemplate 已经使用 Producer 发送了一条事务消息。然后根据该方法执行的返回的 RocketMQLocalTransactionState 结果,提交还是回滚该事务消息。
    • 这里,我们为了模拟 RocketMQ 回查 Producer 来获得事务消息的状态,所以返回了 RocketMQLocalTransactionState.UNKNOWN 未知状态。
  • 实现 #checkLocalTransaction(...) 方法,检查本地事务。
    • 在事务消息长事件未被提交或回滚时,RocketMQ 会回查事务消息对应的生产者分组下的 Producer ,获得事务消息的状态。此时,该方法就会被调用。
    • 这里,我们直接返回 RocketMQLocalTransactionState.COMMIT 提交状态。

一般来说,有两种方式实现本地事务回查时,返回事务消息的状态。

第一种,通过 msg 消息,获得某个业务上的标识或者编号,然后去数据库中查询业务记录,从而判断该事务消息的状态是提交还是回滚。

第二种,记录 msg 的事务编号,与事务状态到数据库中。

  • 第一步,在 #executeLocalTransaction(...) 方法中,先存储一条 idmsg 的事务编号,状态为 RocketMQLocalTransactionState.UNKNOWN 的记录。
  • 第二步,调用带有事务的业务 Service 的方法。在该 Service 方法中,在逻辑都执行成功的情况下,更新 idmsg 的事务编号,状态变更为 RocketMQLocalTransactionState.COMMIT 。这样,我们就可以伴随这个事务的提交,更新 idmsg 的事务编号的记录的状为 RocketMQLocalTransactionState.COMMIT ,美滋滋。。
  • 第三步,要以 try-catch 的方式,调用业务 Service 的方法。如此,如果发生异常,回滚事务的时候,可以在 catch 中,更新 idmsg 的事务编号的记录的状态为 RocketMQLocalTransactionState.ROLLBACK 。极端情况下,可能更新失败,则打印 error 日志,告警知道,人工介入。
  • 如此三步之后,我们在 #executeLocalTransaction(...) 方法中,就可以通过查找数据库,idmsg 的事务编号的记录的状态,然后返回。

相比来说,倾向第二种,实现更加简单通用,对于业务开发者,更加友好。和有几个朋友沟通了下,他们也是采用第二种。

7.4 Demo07Consumer

创建 [Demo03Consumer]类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo07Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = Demo07Message.TOPIC,
        consumerGroup = "demo07-consumer-group-" + Demo07Message.TOPIC
)
public class Demo07Consumer implements RocketMQListener<Demo07Message> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(Demo07Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}

7.5 简单测试

创建 [Demo07ProducerTest]测试类,编写单元测试方法,调用 Demo07Producer 发送事务消息的方式。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo07ProducerTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Demo07Producer producer;

    @Test
    public void testSendMessageInTransaction() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.sendMessageInTransaction(id);
        logger.info("[testSendMessageInTransaction][发送编号:[{}] 发送结果:[{}]]", id, result);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

}

我们来执行 #testSendMessageInTransaction() 方法,测试发送事务消息。控制台输出如下:

# TransactionListenerImpl 执行 executeLocalTransaction 方法,先执行本地事务的逻辑
2020-08-04 22:58:36.889  INFO 1056 --- [           main] p.Demo07Producer$TransactionListenerImpl : [executeLocalTransaction][执行本地事务,消息:GenericMessage [payload=byte[17], headers={rocketmq_TOPIC=DEMO_07, rocketmq_FLAG=0, id=6a603867-a873-3416-efcb-4c3cab1a828f, contentType=application/json, rocketmq_TRANSACTION_ID=240884E30114A66731EF8A0EAAFD768A042018B4AAC214613C4C0000, timestamp=1596553116889}] arg:1596553116]
# Producer 发送事务消息成功,但是因为 executeLocalTransaction 方法返回的是 UNKOWN 状态,所以事务消息并未提交或者回滚
2020-08-04 22:58:36.889  INFO 1056 --- [           main] c.e.s.r.producer.Demo07ProducerTest      : [testSendMessageInTransaction][发送编号:[1596553116] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A042018B4AAC214613C4C0000, offsetMsgId=null, messageQueue=MessageQueue [topic=DEMO_07, brokerName=broker-a, queueId=0], queueOffset=0]]]
# RocketMQ Broker 在发送事务消息 30 秒后,发现事务消息还未提交或是回滚,所以回查 Producer 。此时,checkLocalTransaction 方法返回 COMMIT ,所以该事务消息被提交
2020-08-04 22:58:50.723  INFO 6816 --- [pool-1-thread-1] p.Demo07Producer$TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=0, rocketmq_TOPIC=DEMO_07, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=6585E30D00002A9F000000000003BE8A, TRANSACTION_CHECK_TIMES=1, rocketmq_TRANSACTION_ID=240884E30114A66731EF8A0EAAFD768A042018B4AAC214613C4C0000, rocketmq_SYS_FLAG=0, id=00d49ebc-cd12-2202-3f18-63a831336c81, rocketmq_BORN_HOST=202.99.106.26, contentType=application/json, rocketmq_BORN_TIMESTAMP=1596553116826, timestamp=1596553130723}]]
# 事务消息被提交,所以该消息被 Consumer 消费
2020-08-04 22:58:52.318  INFO 1056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo07Consumer   : [onMessage][线程编号:180 消息内容:Demo07Message{id=1596553116}]

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

相关文章

网友评论

    本文标题:RocketMQ 与 Spring Boot整合(七、事务消息)

    本文链接:https://www.haomeiwen.com/subject/cepgrktx.html