美文网首页
分布式事务 —— 本地消息表

分布式事务 —— 本地消息表

作者: 放开那个BUG | 来源:发表于2021-02-04 22:06 被阅读0次

    1、前言

    本地消息表是最简单实用的分布式事务方案,基本上所知的分布式事务场景,都可以拆成本地消息表的方式执行,最后达到最终一致性。

    无论是 TCC 事务还是本地消息表(TCC 我还没看过一个实际的开源中间件),都依赖本地数据库事务(至少本地消息表是这样的),它将分布式事务拆分成一个个依赖于本地的数据库事务,然后以异步的方式达到最终一致性。

    2、设计

    本地消息表的分布式事务如图所示:


    image.png

    需要注意的是:消费方注意幂等性问题。

    比如我们以传统的订单下发、扣除库存为例(下面代码中为了简略使用了一个订单消息表,实际上为了通用,应该设计通用的消息表,以类型区分是什么消息):

    • 1.插入订单记录时,保存一条消息到本地消息表中,状态字段变为未发送,保证原子性。然后将消发送到 MQ(提供方要确保 插入订单记录 和 插入订单本地消息表 要在同一事务下面。)
    • 2.消息发送成功,执行Confirm的回调,修改消息表中的状态为已发送。
    • 3.消费方采用手动ack机制+重试机制,接收到消息后先进行幂等性判断,然后在处理自己的逻辑。
      消费方异常,本地事务回滚,触发重试,重试失败,ack无法签收,本地消息表状态也无法修改, 保证一致性。
    • 4.为了避免订单系统消息发送不成功的情况,可以提供一个补偿的定时任务,轮询扫描订单的本地消息表,找出所有未发送的记录,进行消息发送。

    订单(发送方)代码:

    package com.tang.order.service;
    
    import com.tang.order.client.ClientStock;
    import com.tang.order.domain.Orders;
    import com.tang.order.domain.OrdersMessage;
    import com.tang.order.mapper.OrdersMapper;
    import com.tang.order.mapper.OrdersMessageMapper;
    import com.tang.order.utils.MapperUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import javax.annotation.Resource;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 创建订单、订单消息、发生消息到 MQ
     */
    @Slf4j
    @Service
    public class OrderService {
    
        @Resource
        private ClientStock clientStock;
    
        @Resource
        private OrdersMapper ordersMapper;
    
        @Resource
        private OrdersMessageMapper ordersMessageMapper;
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 插入订单记录  和 订单消息表 要保证原子性
         * @param orderId
         * @return
         */
        @Transactional(rollbackFor = Exception.class)
        public boolean createOrder(String orderId) {
    
            Orders orders = new Orders();
            orders.setName("蚂蚁课堂");
            orders.setOrderCreatetime(new Date());
            orders.setOrderState(0);
            orders.setOrderMoney(10.0);
            orders.setOrderId(orderId);
    
            // 添加订单记录
            int flage = ordersMapper.insert(orders);
    
            OrdersMessage ordersMessage = null;
            if (flage == 1) {
    
                ordersMessage = new OrdersMessage();
    
                ordersMessage.setId(0);
    
                Map<String, Object> map = new HashMap<>();
    
                map.put("orderId", orderId);
    
                // 消息体的内容
                ordersMessage.setMessageBody(MapperUtils.ObjectToJson(map));
                // 状态为0表示,消息未发送
                ordersMessage.setStatus(0);
                ordersMessage.setCreateTime(new Date());
    
                flage = ordersMessageMapper.insert(ordersMessage);
    
                if (flage != 1) {
                    throw new RuntimeException("回滚事务!");
                }
            }
    
            // 直接发送
            if(flage == 1 && ordersMessage != null){
                // 注册回调函数
                this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                    @Override
                    public void confirm(CorrelationData correlationData, boolean ack, String s) {
                        log.info("消息是否发送成功--->" + ack);
                        // 消息发送成功了
                        if (ack) {
                            log.info("消息发送成功,更改消息表中的状态....");
                            // 更改消息表的状态为已发送
                            String id = correlationData.getId();
    
                            OrdersMessage ordersMessage = new OrdersMessage();
                            ordersMessage.setId(Integer.valueOf(id));
                            ordersMessage.setStatus(1);
    
                            ordersMessageMapper.updateByPrimaryKeySelective(ordersMessage);
                        }
                    }
                });
                this.rabbitTemplate.setMandatory(true);
    
                this.rabbitTemplate.convertAndSend("stockExchange","direct.key",ordersMessage.getMessageBody(),
                        new CorrelationData(ordersMessage.getId() + ""));
    
                return true;
            }
    
            return false;
        }
    }  
    

    定时任务补偿(时间频率可以设长一些):

    package com.tang.order.schedule;
    
    import com.tang.order.domain.OrdersMessage;
    import com.tang.order.mapper.OrdersMessageMapper;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import tk.mybatis.mapper.entity.Example;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    /**
     *
     * 定时任务补偿,频次不要太高,主要是为了补偿发 MQ 失败的
     */
    @Slf4j
    @Component
    public class MqSchedule{
    
        @Resource
        private OrdersMessageMapper ordersMessageMapper;
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Scheduled(cron = "0/2 * * * * ?")
        public void scheduleScanLocalMessage() {
    
            log.info("扫描消息表开始...."+System.currentTimeMillis());
    
            // 查询所有等待发送的消息
            Example example = new Example(OrdersMessage.class);
    
            example.createCriteria().andEqualTo("status", 0);
    
            List<OrdersMessage> ordersMessages = ordersMessageMapper.selectByExample(example);
    
            ordersMessages.forEach(item -> {
    
                log.info("开始发送本地消息表里面的数据.........");
    
                // 注册回调函数
                this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                    @Override
                    public void confirm(CorrelationData correlationData, boolean ack, String s) {
                        log.info("消息是否发送成功--->" + ack);
                        // 消息发送成功了
                        if (ack) {
                            log.info("消息发送成功,更改消息表中的状态....");
                            // 更改消息表的状态为已发送
                            String id = correlationData.getId();
    
                            OrdersMessage ordersMessage = new OrdersMessage();
                            ordersMessage.setId(Integer.valueOf(id));
                            ordersMessage.setStatus(1);
    
                            ordersMessageMapper.updateByPrimaryKeySelective(ordersMessage);
                        }
                    }
                });
                this.rabbitTemplate.setMandatory(true);
    
                this.rabbitTemplate.convertAndSend("stockExchange","direct.key",item.getMessageBody(),
                        new CorrelationData(item.getId() + ""));
    
            });
    
        }
    
    
    
    }
    
    

    库存(消费方)代码:

    package com.tang.stock.listener;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.rabbitmq.client.Channel;
    import com.tang.stock.domain.Stock;
    import com.tang.stock.mapper.StockMapper;
    import com.tang.stock.utils.MapperUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    import tk.mybatis.mapper.entity.Example;
    
    import javax.annotation.Resource;
    import java.io.IOException;
    import java.util.Map;
    
    /**
     * 库存,消费 MQ 消息
     */
    @Component
    @Slf4j
    public class ListenerMessage {
    
        @Resource
        private StockMapper stockMapper;
    
        @RabbitListener(bindings = {
                @QueueBinding(
                        exchange = @Exchange(value = "stockExchange"),
                        value = @Queue(value = "stockQueue"),
                        key = "direct.key"
                )
        })
        @Transactional(rollbackFor = Exception.class)
        public void listenerStock(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel)
                throws IOException {
    
            log.info("接收到消息,进行手动签收...接收到的消息内容为----->" + messageBody);
    
            Map map = MapperUtils.JsonToObject(messageBody, Map.class);
    
            String orderId = map.get("orderId").toString();
    
            // 考虑幂等性
            Stock stock = new Stock();
            stock.setOrderId(orderId);
    
            Example example = new Example(Stock.class);
            example.createCriteria()
                    .andEqualTo("orderId", orderId);
    
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    
            // 重复消费问题
            if (stockMapper.selectCountByExample(example) > 0) {
                log.info("重复消费触发了....");
                try {
    
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    try {
                        channel.basicNack(deliveryTag, false, false);
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
    
                return;
            }
    
            stock.setStock(100);
    
            // 新增一条减库存记录
            int flag = stockMapper.insert(stock);
    
    //        if (flag == 1) {
    //            throw new RuntimeException("手动错误开启....");
    //        }
    
            if (flag == 1) {
                log.info("新增一条减库存记录成功.....分布式事务完成....");
                try {
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    try {
                        channel.basicNack(deliveryTag, false, false);
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
            }
    
        }
    
    }
    

    3、结论

    很简单,依靠本地事务 + 消息表。包括 TCC 也是需要依靠本地事务,所以分布式事务不可能脱离本地事务而存在,毕竟各个微服务系统都是依靠本地事务的。

    我还想说明的是,本地消息表使用有一定的局限性,它只适合多个数据库操作有联系的情况下来做,比如一个事务需要操作

    4、参考资料

    https://juejin.cn/post/6844904041659498509

    相关文章

      网友评论

          本文标题:分布式事务 —— 本地消息表

          本文链接:https://www.haomeiwen.com/subject/rklbtltx.html