美文网首页
分布式事务&seata

分布式事务&seata

作者: shoyu666 | 来源:发表于2023-01-15 10:47 被阅读0次

    微服务中的分布式事务问题

    在单体应用中,3个子模块(库存stock,订单order,账户account),数据的一致性是由数据库保证。

    image.png
    但是当场景是微服务的场景时
    image.png
    库存,订单,账户只能保证各自的数据一致性,无法保证全局的数据一致性。

    分布式事务seata

    分布式事务的发展过程中,有一些著名的理论基础如二阶段提交协议(2pc),协议把分布式事务的过程分为2个阶段进行。
    基于2pc的分布式事务模型有XA、AT、TCC、SAGA,他们的共同点都是基于2pc协议,同时各自有各自的特点和适用场景。
    那么seata是什么,他跟2pc以及XA、AT、TCC、SAGA的关系?
    Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。本质上,seata是打包实现了 XA、AT、TCC、SAGA模型,同时支持Dubbo、Spring Cloud、Sofa-RPC、Motan 和 gRPC 等RPC框架的便捷接入,高可用等。
    也就是2pc以及XA、AT、TCC、SAGA偏协议和理论。
    seata基于协议和理论给出了具体实用的解决方案。
    其中AT模式的方便易用,代码无侵入的特点被广泛使用。

    seata解决方案图示:


    image.png

    从图中看出,seata整个分布式事务分为3个角色(RM,TM,TC)。
    RM,TM是内嵌于微服务中(或者说内嵌于客户端中),TC是独立部署的一个服务(相当于服务端)。

    • TC (Transaction Coordinator) - 事务协调者
      维护全局和分支事务的状态,驱动全局事务提交或回滚。

    • TM (Transaction Manager) - 事务管理器
      定义全局事务的范围:开始全局事务、提交或回滚全局事务。

    • RM (Resource Manager) - 资源管理器
      管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

    seata的-全局事务是由很多分支事务组成,一般来说,分支事务就是本地事务


    image.png

    seata的快速使用

    用例

    用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

    仓储服务:对给定的商品扣除仓储数量。
    订单服务:根据采购需求创建订单。
    帐户服务:从用户帐户中扣除余额。

    架构图
    image.png
    仓储服务
    public interface StorageService {
    
        /**
         * 扣除存储数量
         */
        void deduct(String commodityCode, int count);
    }
    
    订单服务
    public interface OrderService {
    
        /**
         * 创建订单
         */
        Order create(String userId, String commodityCode, int orderCount);
    }
    
    帐户服务
    public interface AccountService {
    
        /**
         * 从用户账户中借出
         */
        void debit(String userId, int money);
    }
    
    主要业务逻辑
    public class BusinessServiceImpl implements BusinessService {
    
        private StorageService storageService;
    
        private OrderService orderService;
    
        /**
         * 采购
         */
        public void purchase(String userId, String commodityCode, int orderCount) {
    
            storageService.deduct(commodityCode, orderCount);
    
            orderService.create(userId, commodityCode, orderCount);
        }
    }
    
    public class OrderServiceImpl implements OrderService {
    
        private OrderDAO orderDAO;
    
        private AccountService accountService;
    
        public Order create(String userId, String commodityCode, int orderCount) {
    
            int orderMoney = calculate(commodityCode, orderCount);
    
            accountService.debit(userId, orderMoney);
    
            Order order = new Order();
            order.userId = userId;
            order.commodityCode = commodityCode;
            order.count = orderCount;
            order.money = orderMoney;
    
            // INSERT INTO orders ...
            return orderDAO.insert(order);
        }
    }
    
    seata 的分布式交易解决方案
    image.png

    我们只需要使用一个 @GlobalTransactional 注解在业务方法上:

    
        @GlobalTransactional
        public void purchase(String userId, String commodityCode, int orderCount) {
            ......
        }
    
    dubbo中的seata

    Business发起全局事务,那么Storage,Order,Account是如何感知的,或者说他们是如何串在一起的。
    这里涉及一个xid的概念,xid是分布式全局事务的唯一id,所以只要Storage,Order,Account绑定的都是同一个xid就能保证他们是在同一个全局事务中,所以只要保证xid在链路上的传递就可以了。
    在dubbo中,xid的传递是通过Fitter实现的。

    /**
     * The type Transaction propagation filter.
     */
    @Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
    public class TransactionPropagationFilter implements Filter {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            String xid = RootContext.getXID(); // 获取当前事务 XID
            String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 获取 RPC 调用传递过来的 XID
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
            }
            boolean bind = false;
            if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
                RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
            } else {
                if (rpcXid != null) { // Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
                    RootContext.bind(rpcXid);
                    bind = true;
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                    }
                }
            }
            try {
                return invoker.invoke(invocation); // 业务方法的调用
    
            } finally {
                if (bind) { // Provider:调用完成后,对 XID 的清理
                    String unbindXid = RootContext.unbind();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                    }
                    if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                        LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                        if (unbindXid != null) { // 调用过程有新的事务上下文开启,则不能清除
                            RootContext.bind(unbindXid);
                            LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                        }
                    }
                }
            }
        }
    }
    
    参考

    seata github
    seata官方文档

    相关文章

      网友评论

          本文标题:分布式事务&seata

          本文链接:https://www.haomeiwen.com/subject/isfacdtx.html