美文网首页
分布式事务

分布式事务

作者: 知止9528 | 来源:发表于2021-01-09 09:48 被阅读0次

    本地事务

    在计算机系统中,更多的是通过关系型数据库来控制事务,这是利用数据库本身的事务特性来实现的,因此叫数据库事务,由于应用主要靠关系数据库来控制事务,而数据库通常和应用在同一个服务器,所以基于关系型数据库的事务又被称为本地事务。


    回顾一下数据库事务的四大特性 ACID:
    A(Atomic):原子性,构成事务的所有操作,要么都执行完成,要么全部不执行,不可能出现部分成功部分失
    败的情况。
    C(Consistency):一致性,在事务执行前后,数据库的一致性约束没有被破坏。比如:张三向李四转100元,
    转账前和转账后的数据是正确状态这叫一致性,如果出现张三转出100元,李四账户没有增加100元这就出现了数
    据错误,就没有达到一致性。
    I(Isolation):隔离性,数据库中的事务一般都是并发的,隔离性是指并发的两个事务的执行互不干扰,一个事
    务不能看到其他事务运行过程的中间状态。通过配置事务隔离级别可以避脏读、重复读等问题。
    D(Durability):持久性,事务完成之后,该事务对数据的更改会被持久化到数据库,且不会被回滚。
    数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作
    要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚


    分布式事务

    要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务

    区分

    本地事务如下

    begin transaction;
    //1.本地数据库操作:张三减少金额
    //2.本地数据库操作:李四增加金额
    commit transation;
    

    分布式事务如下

    begin transaction;
    //1.本地数据库操作:张三减少金额
    //2.远程调用:让李四增加金额
    commit transation;
    

    设想,当远程调用让李四增加金额成功了,由于网络问题远程调用并没有返回,此时本地事务提交失败就回滚了张三减少金额的操作,此时张三和李四的数据就不一致了。


    分布式的基本理论

    如上,网络因素成为了分布式事务的考量标准之一
    CAP理论
    CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。

    示例:


    image.png

    整体执行流程如下:
    1、商品服务请求主数据库写入商品信息(添加商品、修改商品、删除商品)
    2、主数据库向商品服务响应写入成功。
    3、商品服务请求从数据库读取商品信息。

    C - Consistency:
    一致性是指写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上,从任意结点读取到的数据都是最新的状态。
    上图中,商品信息的读写要满足一致性就是要实现如下目标:
    1、商品服务写入主数据库成功,则向从数据库查询新数据也成功。
    2、商品服务写入主数据库失败,则向从数据库查询新数据也失败。

    如何实现一致性?
    1、写入主数据库后要将数据同步到从数据库。
    2、写入主数据库后,在向从数据库同步期间要将从数据库锁定,待同步完成后再释放锁,以免在新数据写入成功
    后,向从数据库查询到旧的数据。

    总结分布式系统一致性的特点如下:
    1、由于存在数据同步的过程,写操作的响应会有一定的延迟。
    2、为了保证数据一致性会对资源暂时锁定,待数据同步完成释放锁定资源。
    3、如果请求数据同步失败的结点则会返回错误信息,一定不会返回旧数据。


    A - Availability :
    可用性是指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误。

    上图中,商品信息读取满足可用性就是要实现如下目标:
    1、从数据库接收到数据查询的请求则立即能够响应数据查询结果。
    2、从数据库不允许出现响应超时或响应错误。

    如何实现可用性?
    1、写入主数据库后要将数据同步到从数据库。
    2、由于要保证从数据库的可用性,不可将从数据库中的资源进行锁定。
    3、即时数据还没有同步过来,从数据库也要返回要查询的数据,哪怕是旧数据,如果连旧数据也没有则可以按照约定返回一个默认信息,但不能返回错误或响应超时。

    总结分布式系统可用性的特点如下:
    1、 所有请求都有响应,且不会出现响应超时或响应错误。


    P - Partition tolerance :
    通常分布式系统的各各结点部署在不同的子网,这就是网络分区,不可避免的会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这叫分区容忍性。

    上图中,商品信息读写满足分区容忍性就是要实现如下目标:
    1、主数据库向从数据库同步数据失败不影响读写操作。
    2、其一个结点挂掉不影响另一个结点对外提供服务。

    如何实现分区容忍性?
    1、尽量使用异步取代同步操作,例如使用异步方式将数据从主数据库同步到从数据,这样结点之间能有效的实现
    松耦合。
    2、添加从数据库结点,其中一个从结点挂掉其它从结点提供服务。

    总结分布式分区容忍性的特点如下:
    1、分区容忍性分是布式系统具备的基本能力。


    结论:

    在所有分布式事务场景中不会同时具备CAP三个特性,因为在具备了P的前提下C和A是不能共存的。

    如果要实现C则必须保证数据一致性,在数据同步的时候为防止向从数据库查询不一致的数据则需要将从数据库数据锁定,待同步完成后解锁,如果同步失败从数据库要返回错误信息或超时信息。

    如果要实现A则必须保证数据可用性,不管任何时候都可以向从数据查询数据,则不会响应超时或返回错误信息。

    通过分析发现在满足P的前提下C和A存在矛盾性。

    CAP有哪些组合方式?

    (1)AP
    放弃一致性,追求分区容忍性和可用性。这是很多分布式系统设计时的选择。
    例如:
    上边的商品管理,完全可以实现AP,前提是只要用户可以接受所查询的到数据在一定时间内不是最新的即可。
    通常实现AP都会保证最终一致性,后面讲的BASE理论就是根据AP来扩展的,一些业务场景 比如:订单退款,今日退款成功,明日账户到账,只要用户可以接受在一定时间内到账即可。

    (2)CP
    放弃可用性,追求一致性和分区容错性
    zookeeper其实就是追求的强一致,又比如跨行转账,一次转账请求要等待双方银行系统都完成整个事务才算完成。

    (3)CA
    放弃分区容忍性,即不进行分区,不考虑由于网络不通或结点挂掉的问题,则可以实现一致性和可用性。那么系统将不是一个标准的分布式系统,我们最常用的关系型数据就满足了CA。


    BASE理论

    1.强一致性和最终一致性
    CAP理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。
    其中AP在实际应用中较多,AP即舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性,比如前边我们举的例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,这种一致性和CAP中的一致性不同。
    CAP中的一致性要求在任何时间查询每个结点数据都必须一致,它强调的是强一致性,
    最终一致性是允许可以在一段时间内每个结点的数据不一致,但是经过一段时间每个结点的数据必须一致,它强调的是最终数据的一致性。

    2.BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写

    BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。

    说明:
    基本可用:分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。如,电商网站交易付款出现问题了,商品依然可以正常浏览。
    软状态:由于不要求强一致性,所以BASE允许系统中存在中间状态(也叫软状态),这个状态不影响系统可用性,如订单的"支付中"、“数据同步中”等状态,待数据最终一致后状态改为“成功”状态。
    最终一致:最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变为“支付成功”或者"支付失败",使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。


    以理论为基础,针对不同的分布式场景业界常见的解决方案有2PC,###TCC、可靠消息最终一致性、最大努力通知这几种。


    分布式事务解决方案之2PC(两阶段提交)

    2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。

    举例:张三和李四好久不见,老友约起聚餐,饭店老板要求先买单,才能出票。这时张三和李四分别抱怨近况不如意,囊中羞涩,都不愿意请客,这时只能AA。只有张三和李四都付款,老板才能出票安排就餐。但由于张三和李四都是铁公鸡,形成了尴尬的一幕:
    准备阶段:老板要求张三付款,张三付款。老板要求李四付款,李四付款。
    提交阶段:老板出票,两人拿票纷纷落座就餐。

    例子中形成了一个事务,若张三或李四其中一人拒绝付款,或钱不够,店老板都不会给出票,并且会把已收款退回。

    整个事务过程由事务管理器和参与者组成,店老板就是事务管理器,张三、李四就是事务参与者,事务管理器负责决策整个分布式事务的提交和回滚,事务参与者负责自己本地事务的提交和回滚。

    在计算机中部分关系数据库如Oracle、MySQL支持两阶段提交协议,如下:

    1. 准备阶段(Prepare phase):事务管理器给每个参与者发送Prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo日志,此时事务没有提交。
      (Undo日志是记录修改前的数据,用于数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据文件)
    2. 提交阶段(commit phase):如果事务管理器收到了参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。
      注意:必须在最后阶段释放锁资源。

    整个2PC的事务流程涉及到三个角色AP、RM、TM。AP指的是使用2PC分布式事务的应用程序;RM指的是资源管理器,它控制着分支事务;TM指的是事务管理器,它控制着整个全局事务。
    1)在准备阶段RM执行实际的业务操作,但不提交事务,资源锁定;
    2)在提交阶段TM会接受RM在准备阶段的执行回复,只要有任一个RM执行失败,TM会通知所有RM执行回滚操作,否则,TM将会通知所有RM提交该事务。提交阶段结束资源锁释放。

    2PC方案,资源锁需要等到两个阶段结束才释放,性能较差。


    Seata方案

    Seata是由阿里中间件团队发起的开源项目 Fescar,后更名为Seata,它是一个是开源的分布式事务框架。

    传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服
    务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。

    Seata的设计思想如下:
    Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,并解决2PC方案面临的问题。

    Seata实现2PC与传统2PC的差别:

    1. 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而Seata的 RM 是以jar包的形式作为中间件层部署在应用程序这一侧的。
    2. 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率。

    示例1:如新用户注册送积分


    image.png

    具体的执行流程如下:

    1. 用户服务的 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
    2. 用户服务的 RM 向 TC 注册 分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖。
    3. 用户服务执行分支事务,向用户表插入一条记录。
    4. 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播)。积分服务的RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。
    5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。
    6. 用户服务分支事务执行完毕。
    7. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
    8. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
    image.png

    实现方案细节

    1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log。
    2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。
    3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
    4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
    5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操作。

    代码实现

    dtx-seata-demo-bank1
    dtx-seata-demo-bank1实现如下功能:
    1、张三账户减少金额,开启全局事务。
    2、远程调用bank2向李四转账。
    (1)DAO

    @Mapper
    @Component
    public interface AccountInfoDao {
    //更新账户金额
    @Update("update account_info set account_balance = account_balance + #{amount} where
    account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double
    amount);
    }
    
    

    (2)FeignClient
    远程调用bank2的客户端

    @FeignClient(value = "seata‐demo‐bank2",fallback = Bank2ClientFallback.class)
    public interface Bank2Client {
    @GetMapping("/bank2/transfer")
    String transfer(@RequestParam("amount") Double amount);
    }
    
    @Component
    public class Bank2ClientFallback implements Bank2Client{
    @Override
    public String transfer(Double amount) {
    return "fallback";
    }
    }
    

    (3)Service

    @Service
    public class AccountInfoServiceImpl implements AccountInfoService {
    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
    @Autowired
    AccountInfoDao accountInfoDao;
    @Autowired
    Bank2Client bank2Client;
    //张三转账
    @Override
    @GlobalTransactional
    @Transactional
    public void updateAccountBalance(String accountNo, Double amount) {
    logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
    //张三扣减金额
    accountInfoDao.updateAccountBalance(accountNo,amount*‐1);
    //向李四转账
    String remoteRst = bank2Client.transfer(amount);
    //远程调用失败
    if(remoteRst.equals("fallback")){
    throw new RuntimeException("bank1 下游服务异常");
    }
    //人为制造错误
    if(amount==3){
    throw new RuntimeException("bank1 make exception 3");
    }
    }
    }
    

    将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务:
    GlobalTransactionalInterceptor会拦截@GlobalTransactional注解的方法,生成全局事务ID(XID),XID会在整个分布式事务中传递。
    在远程调用时,spring-cloud-alibaba-seata会拦截Feign调用将XID传递到下游服务。

    (6)Controller

    @RestController
    public class Bank1Controller {
    @Autowired
    AccountInfoService accountInfoService;
    //转账
    @GetMapping("/transfer")
    public String transfer(Double amount){
    accountInfoService.updateAccountBalance("1",amount);
    return "bank1"+amount;
    }
    }
    

    dtx-seata-demo-bank2
    dtx-seata-demo-bank2实现如下功能:
    1、李四账户增加金额。
    dtx-seata-demo-bank2在本账号事务中作为分支事务不使用@GlobalTransactional。

    (1)DAO

    @Mapper
    @Component
    public interface AccountInfoDao {
    //向李四转账
    @Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE
    account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double
    amount);
    }
    

    (2)Service

    @Service
    public class AccountInfoServiceImpl implements AccountInfoService {
    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
    @Autowired
    AccountInfoDao accountInfoDao;
    @Override
    @Transactional
    public void updateAccountBalance(String accountNo, Double amount) {
    logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
    //李四增加金额
    accountInfoDao.updateAccountBalance(accountNo,amount);
    //制造异常
    if(amount==2){
    throw new RuntimeException("bank1 make exception 2");
    }
    }
    }
    

    (3)Controller

    @RestController
    public class Bank2Controller {
    @Autowired
    AccountInfoService accountInfoService;
    @GetMapping("/transfer")
    public String transfer(Double amount){
    accountInfoService.updateAccountBalance("2",amount);
    return "bank2"+amount;
    }
    }
    

    小结

    可以看出适用seata还是比较简单的,Seata实现2PC的两种2PC方案,由于Seata的0侵入性并且解决了传统2PC长期锁资源的问题,所以推荐采用Seata实现2PC。
    Seata实现2PC要点:
    1、全局事务开始使用 @GlobalTransactional标识 。
    2、每个本地事务方案仍然使用@Transactional标识。
    3、每个数据都需要创建undo_log表,此表是seata保证本地事务一致性的关键。但也带来了性能损耗,需要考虑场景


    分布式事务解决方案之TCC

    TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try、确认Confirm、撤销Cancel。
    Try操作做业务检查及资源预留,
    Confirm做业务确认操作,
    Cancel实现一个与Try相反的操作即回滚操作。

    详细说明

    1. Try 阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm 一起才能真正构成一个完整的业务逻辑。
    2. Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用TCC则认为 Confirm阶段是不会出错的。
      即假设:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引
      入重试机制或人工处理。
    3. Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。

    TCC需要注意三种异常处理分别是空回滚、幂等、悬挂:

    空回滚:
    在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。
    出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。
    解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。


    幂等:
    通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
    解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。


    悬挂:
    悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。
    出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者真正执行,而一个 Try 方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。
    解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。


    目前市面上的TCC框架众多比如下面这几种

    框架名称 Gitbub地址
    tcc-transaction https://github.com/changmingxie/tcc-transaction
    Hmily https://github.com/yu199195/hmily
    ByteTCC https://github.com/liuyangming/ByteTCC
    EasyTransaction https://github.com/QNJR-GROUP/EasyTransaction

    虽然Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。

    小结

    如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突提高吞吐量成为可能。
    而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。


    分布式事务解决方案之可靠消息最终一致性

    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

    image.png

    可靠消息最终一致性方案要解决以下几个问题:
    1.本地事务与消息发送的原子性问题
    本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。

    2、事务参与方接收消息的可靠性
    事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

    3、消息重复消费的问题
    由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。
    要解决消息重复消费的问题就要实现事务参与方的方法幂等性


    解决方案

    方案一:本地消息表方案

    此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

    示例:
    用户服务和积分服务,用户服务负责添加用户,积分服务负责增加积分。


    image.png

    交互流程如下:
    1、用户注册
    用户服务在本地事务新增用户和增加 ”积分消息日志“。(用户表和消息表通过本地事务保证一致)
    伪代码

    begin transaction;
    //1.新增用户
    //2.存储积分消息日志
    commit transation;
    

    这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原
    子性。

    2、定时任务扫描日志
    如何保证将消息发送给消息队列呢?
    经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。

    3、消费消息
    如何保证消费者一定能消费到消息呢?
    这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重
    试向消费者来发送消息。

    积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复投递此消息。
    由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。


    方案二:RocketMQ事务消息方案

    image.png

    执行流程如下:
    还以注册送积分的例子来描述 整个流程。
    Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。

    1、Producer 发送事务消息
    Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
    本例中,Producer 发送 ”增加积分消息“ 到MQ Server。

    2、MQ Server回应消息发送成功MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

    3、Producer 执行本地事务
    Producer 端执行业务代码逻辑,通过本地数据库事务控制。
    本例中,Producer 执行添加用户操作。

    4、消息投递
    若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;
    若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。
    MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

    5、事务回查
    如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

    以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

    RoacketMQ提供RocketMQLocalTransactionListener接口:

    public interface RocketMQLocalTransactionListener {
    /**
    ‐ 发送prepare消息成功此方法被回调,该方法用于执行本地事务
    ‐ @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
    ‐ @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
    ‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
    */
    RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
    /**
    ‐ @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
    ‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
    */
    RocketMQLocalTransactionState checkLocalTransaction(Message msg);
    }
    
    

    发送事务消息:
    以下是RocketMQ提供用于发送事务消息的API:

    TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    //设置TransactionListener实现
    producer.setTransactionListener(transactionListener);
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    

    小结

    可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了RocketMQ作为消息中间件,RocketMQ主要解决了两个功能:
    1、本地事务与消息发送的原子性问题。
    2、事务参与方接收消息的可靠性。

    适用场景

    可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。


    分布式事务解决方案之最大努力通知

    充值示例:


    image.png

    交互流程:
    1、账户系统调用充值系统接口
    2、充值系统完成支付处理向账户系统发起充值结果通知
    若通知失败,则充值系统按策略进行重复通知
    3、账户系统接收到充值结果通知修改充值状态。
    4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

    通过上边的例子我们总结最大努力通知方案的目标:
    目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

    具体包括:
    1、有一定的消息重复通知机制。
    因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
    2、消息校对机制。
    如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。


    最大努力通知与可靠消息一致性的区别?

    1、解决方案思想不同
    可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证

    最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方

    2、两者的业务应用场景不同
    可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。

    最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。

    3、技术解决方向不同
    可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。

    最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)

    实现方案一:
    采用MQ的ack机制就可以实现最大努力通知。


    image.png

    本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:
    1、发起通知方将通知发给MQ。
    使用普通消息机制将通知发给MQ。
    注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。
    2、接收通知方监听 MQ。
    3、接收通知方接收消息,业务处理完成回应ack。
    4、接收通知方若没有回应ack则MQ会重复通知。
    MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
    5、接收通知方可通过消息校对接口来校对消息的一致性。


    实现方案二
    案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:


    image.png

    交互流程如下:
    1、发起通知方将通知发给MQ。
    使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。
    2、通知程序监听 MQ,接收MQ的消息。
    方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。
    通知程序若没有回应ack则MQ会重复通知。
    3、通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。
    通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消息。
    4、接收通知方可通过消息校对接口来校对消息的一致性。

    方案1和方案2的不同点:
    1、方案1中接收通知方与MQ接口,即接收通知方案监听 MQ,此方案主要应用与内部应用之间的通知。
    2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

    小结

    最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务;
    最大努力通知方案需要实现如下功能:
    1、消息重复通知机制。
    2、消息校对机制。


    分布式事务综合分析

    以P2P的相关业务流程案例分析如下
    一. 用户注册为例


    image.png

    针对注册业务,如果用户与账号信息不一致,则会导致严重问题,因此该业务对一致性要求较为严格,即当用户服务和账号服务任意一方出现问题都需要回滚事务。

    1、采用可靠消息一致性方案
    可靠消息一致性要求只要消息发出,事务参与者接到消息就要将事务执行成功,不存在回滚的要求,所以不适用。

    2、采用最大努力通知方案
    最大努力通知表示发起通知方执行完本地事务后将结果通知给事务参与者,即使事务参与者执行业务处理失败发起通知方也不会回滚事务,所以不适用。

    3、采用Seata实现2PC
    在用户中心发起全局事务,统一账户服务为事务参与者,用户中心和统一账户服务只要有一方出现问题则全局事务回滚,符合要求。
    实现方法如下:
    1、用户中心添加用户信息,开启全局事务
    2、统一账号服务添加账号信息,作为事务参与者
    3、其中一方执行失败Seata对SQL进行逆操作删除用户信息和账号信息,实现回滚。

    4、采用TCC
    TCC也可以实现用户中心和统一账户服务只要有一方出现问题则全局事务回滚,符合要求。
    实现方法如下:
    1、用户中心
    try:添加用户,状态为不可用
    confirm:更新用户状态为可用
    cancel:删除用户
    2、统一账号服务
    try:添加账号,状态为不可用
    confirm:更新账号状态为可用
    cancel:删除账号


    二. 存管开户案例
    P2P业务必须让银行存管资金,用户的资金在银行存管系统的账户中,而不在P2P平台中,因此用户要在银行存管系统开户。


    image.png

    用户向用户中心提交开户资料,用户中心生成开户请求号并重定向至银行存管系统开户页面。用户设置存管密码并确认开户后,银行存管立即返回“请求已受理”。在某一时刻,银行存管系统处理完该开户请求后,将调用回调地址通知处理结果,若通知失败,则按一定策略重试通知。同时,银行存管系统应提供开户结果查询的接口,供用户中
    心校对结果。

    分析:
    P2P平台的用户中心与银行存管系统之间属于跨系统交互,银行存管系统属于外部系统,用户中心无法干预银行存管系统,所以用户中心只能在收到银行存管系统的业务处理结果通知后积极处理,开户后的使用情况完全由用户中心来控制。

    1、采用Seata实现2PC
    需要侵入银行存管系统的数据库,由于它的外部系统,所以不适用。

    2、采用Hmily实现TCC
    TCC侵入性更强,所以不适用。

    3、基于MQ的可靠消息一致性
    如果让银行存管系统监听 MQ则不合适 ,因为它的外部系统。
    如果银行存管系统将消息发给MQ用户中心监听MQ是可以的,但是由于相对银行存管系统来说用户中心属于外部系统,银行存管系统是不会让外部系统直接监听自己的MQ的,基于MQ的通信协议也不方便外部系统间的交互,所以本方案不合适。

    4、最大努力通知方案
    银行存管系统内部使用MQ,银行存管系统处理完业务后将处理结果发给MQ,由银行存管的通知程序专门发送通知,并且采用互联网协议通知给第三方系统(用户中心)。

    下图中发起通知即银行存管系统:


    image.png

    三. 满标审核案例
    业务流程
    在借款人标的募集够所有的资金后,P2P运营管理员审批该标的,触发放款,并开启还款流程。


    image.png

    管理员对某标的满标审批通过,交易中心修改标的状态为“还款中”,同时要通知还款服务生成还款计划。

    解决方案分析
    生成还款计划是一个执行时长较长的业务,不建议阻塞主业务流程,此业务对一致性要求较低。
    1、采用Seata实现2PC
    Seata在事务执行过程会进行数据库资源锁定,由于事务执行时长较长会将资源锁定较长时间,所以不适用。

    2、采用TCC
    本需求对业务一致性要求较低,因为生成还款计划的时长较长,所以不要求交易中心修改标的状态为“还款中”就立即生成还款计划 ,所以本方案不适用。

    3、基于MQ的可靠消息一致性
    满标审批通过后由交易中心修改标的状态为“还款中”并且向还款服务发送消息,还款服务接收到消息开始生成还款计划,基本于MQ的可靠消息一致性方案适用此场景 。

    4、最大努力通知方案
    满标审批通过后由交易中心向还款服务发送通知要求生成还款计划,还款服务并且对外提供还款计划生成结果校对接口供其它服务查询,最大努力通知方案也适用本场景 。


    各种方案的优缺点:

    2PC 最大的诟病是一个阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其阻塞机制和最差时间复杂度高, 因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并发较高以及子事务生命周期较长 (long-running transactions) 的分布式服务中。

    拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。典型的使用场景:满,登录送优惠券等。

    可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注册送积分,登录送优惠券等。

    最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景:银行通知、支付结果通知等。

    无论是数据库层的XA、还是应用层TCC、可靠消息、最大努力通知等方案,都没有完美解决分布式事务问题,它们不过是各自在性能、一致性、可用性等方面做取舍,寻求某些场景偏好下的权衡。

    相关文章

      网友评论

          本文标题:分布式事务

          本文链接:https://www.haomeiwen.com/subject/gixcaktx.html