美文网首页
分布式事务设计

分布式事务设计

作者: dylan丶QAQ | 来源:发表于2020-11-09 11:27 被阅读0次

1. 对于分布式事务的整体理解

在传统业务中,我们的数据库都是单机,数据库本身就提供ACID,由于业务量激增我们将数据分到多个库中,对数据库进行分库分表设计,一个逻辑表数据被分到多个数据库中,这个时候单个库的ACID就无法管理两个数据库了

这个时候就需要理解和掌握分布式事务的概念了

  • CAP原理

  • ACID原理和BASE原理

  • 基于XA协议的两阶段提交

  • 事务的补偿机制

    • 理解起来非常简单,但实现比较复杂

    • 操作多个数据库的时候,一个成功一个失败,成功的数据库就需要有一个补偿接口来进行成功操作的回滚,让数据恢复到初始状态

  • 基于本地消息的最终一致性

  • 基于MQ消息队列的最终一致性

2. CAP原理解析

  • C-Consistent,一致性。操作成功后,所有节点看到的数据都是一样的

  • A-Availability,可用性,服务全部一致可用,在规定时间内完成响应

  • P-Partition tolerance,分区容错性。指分布式系统在遇到某个节点故障后,仍能够对外提供服务

CAP原理指出:这3个指标不可能同时满足,最多满足其中两个

  • CA

  • CP

  • AP

我们之所以把系统架构设计成分布式系统,就是为了在某个节点不可用的情况,整个服务队外可用。这样就满足了P,如果不满足P,就一定不是一个分布式系统了

image-20200318224059218.png

A和B是两个数据节点,A向B同步数据,并且作为一个整体对外提供服务,首先满足P了

A和B同步的过程中如果同步不能保证,那么这个结构就是一个AP结构

如果我们满足C,也就是说client无论访问A还是B得到的结果都是一致的,如果A和B数据同步出现延迟,只能确保A和B数据同步完成后才能对外提供服务,这个时候A就保证不了,这就是一个CP结构

3. ACID原理和BASE原理

A:原子性

C:一致性

I:隔离性

D:持久性

ACID是强一致性,ACID在CAP的原理中保证的是CA,因此对于ACID也在向BASE转变

BASE是什么呢?

Basically Available(基本可用):分布式系统在出现故障后,允许损失部分可用性,保证核心可用,比如电商大促,服务降级的体现

Soft-state(软状态):允许系统存在中间状态,中间状态是不会影响系统整体的可用性,比如到达各个分片的数据同步状态,允许这些数据有一段时间不同步

Eventually consistent(最终一致):指系统中所有数据副本在经过一定时间后,最终达到一致的状态

BASE模型是ACID的反面,不同于ACID,BASE强调牺牲高一致性,从而获得可用性,数据允许一段时间不一致但最终一致就可以了

在分布式事务中都是依赖ACID或BASE模型实现的

  • 基于XA的两段式提交协议和事务补偿机制都是基于ACID模型的

  • 基于本地消息和基于MQ消息的最终一致性方式是基于BASE模型实现的

4. 分布式事务涉及的问题分析

单一数据库结构中

  • ACID的强事务类型
  • 所有数据表都在同一个数据库
  • 数据库的事务得到很好的支持

分布式架构中

  • 分布式系统中,业务拆成多个数据库
  • 多个独立的数据库之间无法统一事务
  • 有可能导致数据不一致
  • 举个例子
    • 一个下单操作,通过积分购物
    • 执行顺序:用户的积分扣减,订单生成,库存扣减
    • 这个三个业务不会在一个数据中,就不能保证事务统一
    • 任何一个环节出错,其他环节都不会回滚

解决分布式事务的方案:

  • 基于XA协议的两阶段:ACID的方式
  • 事务补偿机制TCC:ACID的方式
  • 基于本地消息表+定时任务的最终一致性:BASE方式
  • 基于MQ的最终一致性方式:BASE方式

5. 通过XA协议实现两阶段提交

  • XA是由X/Open组织提出的分布式事务规范
  • 整体是由一个事务管理器(TM)和多个资源管理器(RM)组成
    • RM一般就是指我们的数据库
    • TM相当于程序中的数据源
  • 提交分为两个阶段:prepare和commit

第一个阶段:prepare

image-20200321202031686.png

TM告诉所有的RM进行准备,如果都返回ok了,才进行commit,一旦在准备阶段出现任何问题都会返回错误给到TM,TM就会回滚并结束操作

第二阶段:commit

image-20200321202234509.png

到commit阶段如果第二个RM出现问题没有提交成功会返回一个状态到TM,而第一个RM已经完成的commit就不能回滚了,这个时候就需要人工介入了

XA协议的两阶段提交总结:

  • 保证数据的一致性
  • commit阶段如果出现问题,事务会不一致,需要人工处理
  • XA协议的效率比较低,性能与本地事务相比相差10倍
  • MySQL v5.7及以上版本均支持XA协议
  • mysql-connector-java驱动要v5.0以上
  • Java系统中,数据源采用Atomikos充当TM的角色

6. 使用Atomikos进行分布式事务

先准备环境:两台MySQLv5.7数据库

导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

数据源配置代码加上了TM管理器

package com.icodingedu.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.UserTransaction;

@Configuration
public class DB195Config {

    @Bean("db195")
    public DataSource db195(){
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUser("gavin");
        mysqlXADataSource.setPassword("123456");
        mysqlXADataSource.setUrl("jdbc:mysql://39.100.17.31:3306/user_195");

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
        return atomikosDataSourceBean;
    }
        //这里就相当于一个TM,只加在一个config里就行
    @Bean("xaTransaction")
    public JtaTransactionManager jtaTransactionManager(){
        UserTransaction userTransaction = new UserTransactionImp();
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        return new JtaTransactionManager(userTransaction,userTransactionManager);
    }
}

另一个数据源

package com.icodingedu.config;

import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class DB197Config {

    @Bean("db197")
    public DataSource db197(){
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUser("gavin");
        mysqlXADataSource.setPassword("123456");
        mysqlXADataSource.setUrl("jdbc:mysql://39.100.19.243:3306/user_197");

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
        return atomikosDataSourceBean;
    }
}

调用代码

package com.icodingedu.service;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;

@Service
public class DBService {

    @Transactional(transactionManager = "xaTransaction")
    public void insertTest(@Qualifier("db195")DataSource dataSource195,
                           @Qualifier("db197")DataSource dataSource197){
        JdbcTemplate jdbc195 = new JdbcTemplate(dataSource195);
        String sql1 = "insert into user_info(id,username) values(1,'gavin1')";
        int i = jdbc195.update(sql1);
        System.out.println("**************影响的行数:"+i);

        JdbcTemplate jdbc197 = new JdbcTemplate(dataSource197);
        String sql2 = "insert into user_info(id,username) values(1,'gavin1')";
        int i1 = jdbc197.update(sql2);
        System.out.println("**************影响的行数:"+i1);
    }
}

测试代码

import com.icodingedu.service.DBService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import javax.sql.DataSource;

@SpringBootTest
class DistributeTransactionsApplicationTests {

    @Autowired
    DBService dbService;

    @Test
    void contextLoads(@Qualifier("db195") DataSource dataSource195,
                      @Qualifier("db197")DataSource dataSource197) {
        dbService.insertTest(dataSource195,dataSource197);
    }
}

7. MyCat实现分布式事务

默认客户端是不进行分布式事务的,需要自己手动开启分布式事务

set autocommit=0;
set xa=on;
insert into user_info(id,username) values(1,'gavin1'),(5000001,'gavin2');
commit;

通过MyBatis实现客户端调用

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

POJO

package com.icodingedu.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserInfo {
    private int id;
    private String username;
}

mapper

package com.icodingedu.mapper;

import com.icodingedu.pojo.UserInfo;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

@Mapper
@Repository
public interface UserMapper {
    int addUser(UserInfo userInfo);
}

xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.UserMapper">
    <insert id="addUser" parameterType="com.icodingedu.pojo.UserInfo">
        insert into user_info(id,username) values(#{id},#{username})
    </insert>
</mapper>

service

package com.icodingedu.service;

import com.icodingedu.mapper.UserMapper;
import com.icodingedu.pojo.UserInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class MyCatService {

    @Autowired
    UserMapper userMapper;

    @Transactional(rollbackFor = Exception.class)
    public void addUser(){
        UserInfo userInfo1 = new UserInfo();
        userInfo1.setId(1);
        userInfo1.setUsername("gavin1");
        userMapper.addUser(userInfo1);

        UserInfo userInfo2 = new UserInfo();
        userInfo2.setId(5000001);
        userInfo2.setUsername("gavin2");
        userMapper.addUser(userInfo2);
    }
}

测试

package com.icodingedu;

import com.icodingedu.service.MyCatService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MycatMybatisApplicationTests {

    @Autowired
    MyCatService myCatService;

    @Test
    void contextLoads() {
        myCatService.addUser();
    }

}

8. Sharding-Jdbc实现分布式事务

Sharding-Jdbc是自动支持分布式事务的,多个执行数据库的内容,只需要放在一个方法里,通过@Transactional(rollbackFor = Exception.class)设置即可实现

9. 事务补偿机制TCC分析

TCC分别对应Try、Confirm和Cancel三种操作实现的

  • 针对每个操作,都要提前注册一个与其对应的补偿(撤销)操作
  • 在执行失败后按照失败节向前补偿,撤销之前的操作
  • 示例:A给B转账,A和B在两家不同的银行
    • A减200,B加200
    • 两个操作都要保持原子性
    • 存在分布式事务
    • 并且是不同的两个公司
image-20200321215604676.png

注意的点:

  • 补偿接口也有可能出错
  • 需要循环调用补偿直到完成,循环次数需要计数,超过retry次数就记录并不再retry待人工处理

TCC的特点

  • 优点:逻辑清晰,流程简单
  • 缺点:数据的一致性比XA还要差,出错的点比较多

TCC属于应用层的一种补偿机制,需要大量的代码开发,对与开发人员要求比较高

10. 事务补偿机制代码实现

数据源config配置

DB195Config

package com.icodingedu.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.mysql.cj.jdbc.MysqlDataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.UserTransaction;

@Configuration
@MapperScan(value = "com.icodingedu.mapper.db195",sqlSessionFactoryRef = "factoryBean195")
public class DB195Config {

    @Bean("db195")
    public DataSource db195(){
        MysqlDataSource mysqlDataSource = new MysqlDataSource();
        mysqlDataSource.setUser("gavin");
        mysqlDataSource.setPassword("123456");
        mysqlDataSource.setUrl("jdbc:mysql://39.100.17.31:3306/user_195");
        return mysqlDataSource;
    }

    @Bean("factoryBean195")
    public SqlSessionFactoryBean factoryBean(@Qualifier("db195") DataSource dataSource) throws Exception{
        SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);

        ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();

        sessionFactoryBean.setMapperLocations(patternResolver.getResources("mybatis/db195/*.xml"));
        return sessionFactoryBean;
    }

    @Bean("tm195")
    public PlatformTransactionManager platformTransactionManager(@Qualifier("db195") DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }
}

DB197Config

package com.icodingedu.config;

import com.mysql.cj.jdbc.MysqlDataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
@MapperScan(value = "com.icodingedu.mapper.db197",sqlSessionFactoryRef = "factoryBean197")
public class DB197Config {

    @Bean("db197")
    public DataSource db197(){
        MysqlDataSource mysqlDataSource = new MysqlDataSource();
        mysqlDataSource.setUser("gavin");
        mysqlDataSource.setPassword("123456");
        mysqlDataSource.setUrl("jdbc:mysql://39.100.19.243:3306/user_197");
        return mysqlDataSource;
    }

    @Bean("factoryBean197")
    public SqlSessionFactoryBean factoryBean(@Qualifier("db197") DataSource dataSource) throws Exception{
        SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);

        ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();

        sessionFactoryBean.setMapperLocations(patternResolver.getResources("mybatis/db197/*.xml"));
        return sessionFactoryBean;
    }

    @Bean("tm197")
    public PlatformTransactionManager platformTransactionManager(@Qualifier("db197") DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }
}

195Mapper

package com.icodingedu.mapper.db195;

import com.icodingedu.pojo.UserInfo195;
import org.springframework.stereotype.Repository;

@Repository
public interface UserMapper195 {
    int update(UserInfo195 userInfo195);
    UserInfo195 queryById(int id);
}

197Mapper

package com.icodingedu.mapper.db197;

import com.icodingedu.pojo.UserInfo197;
import org.springframework.stereotype.Repository;

@Repository
public interface UserMapper197 {
    int update(UserInfo197 userInfo197);
    UserInfo197 queryById(int id);
}

195 POJO

package com.icodingedu.pojo;

import lombok.Data;
import org.springframework.stereotype.Component;

@Data
public class UserInfo195 {
    private int id;
    private String username;
    private int account;
}

197 POJO

package com.icodingedu.pojo;

import lombok.Data;
import org.springframework.stereotype.Component;

@Data
public class UserInfo197 {
    private int id;
    private String username;
    private int account;
}

195 xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db195.UserMapper195">
    <update id="update" parameterType="com.icodingedu.pojo.UserInfo195">
        update user_info set account=#{account} where id=#{id}
    </update>
    <select id="queryById" resultType="com.icodingedu.pojo.UserInfo195">
        select * from user_info where id=#{id}
    </select>
</mapper>

197 xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db197.UserMapper197">
    <update id="update" parameterType="com.icodingedu.pojo.UserInfo197">
        update user_info set account=#{account} where id=#{id}
    </update>
    <select id="queryById" resultType="com.icodingedu.pojo.UserInfo197">
        select * from user_info where id=#{id}
    </select>
</mapper>

service

package com.icodingedu.service;

import com.icodingedu.mapper.db195.UserMapper195;
import com.icodingedu.mapper.db197.UserMapper197;
import com.icodingedu.pojo.UserInfo195;
import com.icodingedu.pojo.UserInfo197;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Service
public class UserInfoService {

    @Autowired
    UserMapper195 userMapper195;

    @Autowired
    UserMapper197 userMapper197;

    @Transactional(transactionManager = "tm195")
    public void transfer(){
        UserInfo195 userInfo195 = userMapper195.queryById(1);
        userInfo195.setAccount(userInfo195.getAccount()-200);
        userMapper195.update(userInfo195);

        try {
            UserInfo197 userInfo197 = userMapper197.queryById(1);
            userInfo197.setAccount(userInfo197.getAccount() + 200);
            userMapper197.update(userInfo197);

            int i = 1 / 0;
        }catch (Exception ex){
            ex.printStackTrace();
            UserInfo197 userInfo197 = userMapper197.queryById(1);
            userInfo197.setAccount(userInfo197.getAccount() - 200);
            userMapper197.update(userInfo197);
            throw ex;
        }
        System.out.println("执行完毕********************");
        
    }
}
  • 如果多个接口API怎么办?
  • 不同的系统API调用是无法通过事务异常直接控制,补偿就更复杂了
  • 整体设计对开发人员要求比较高
  • TCC在实际生产中还是有很多难点的

11. 基于本地消息的最终一致性方案设计

  • 采用BASE原理实现,保障事务的最终一致性

  • 设计的过程中是否采用最终一致性需要根据业务来进行评估

  • 基于本地消息的方式,将本事务外的操作,记录在消息表中

    • 举例:下订单-支付,这就是两个事务
  • 其他事务提供操作接口(支付成功后如果直接调用订单接口)

  • 定时轮询的方式将未执行的消息发送给操作接口

  • 操作接口返回失败记录失败的标识,需要设置retry次数

  • 超过retry的次数后不再进行消息发送并记录我们的失败状态

  • 重试后没有成功的就可以通过人工补偿

image-20200322201614603.png

这个图例的理解:

  • 业务表/消息表:支付的业务和支付成功的通知

    • 支付相关信息记录到业务表

    • 将需要通知订单的消息记录到消息表中

    • 定时任务通过定时轮询消息表来获取哪个订单需要发送确认消息

    • 定时任务通过接口修改业务表状态

      • 如果成功更新消息表状态

      • 失败就不断轮询并记录轮询次数,超次数后标记并不再调用

  • 业务表:这就是订单信息

    • 记录订单业务数据

    • 支付成功后订单状态

  • 订单的支付状态在一段时间不一致,但最终一致

基于本地消息的特点

  • 优点:

    • 将事务拆分没有同时操作两个数据库,每一步之操作自己的数据库,保证事务完整性

    • 避免了分布式事务,实现最终一致性

  • 缺点:

    • 要注意重试时的幂等性操作

12. 基于本地消息的最终一致性代码实现

支付的消息表

CREATE TABLE `pay_msg` (
 `id` int(11) NOT NULL,
 `order_id` int(11) NOT NULL,
 `status` int(11) NOT NULL DEFAULT '0',
 `fail_count` int(11) NOT NULL DEFAULT '0',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

支付的信息表

CREATE TABLE `user_account` (
 `id` int(11) NOT NULL,
 `username` varchar(255) NOT NULL,
 `account` int(11) NOT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

订单信息表

CREATE TABLE `order_info` (
 `id` int(11) NOT NULL,
 `order_status` int(11) NOT NULL,
 `order_amount` int(11) NOT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

POJO代码

package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class PayMsg {
 private int id;
 private int order_id;
 private int status;
 private int fail_count;
}
package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class UserAccount {
 private int id;
 private String username;
 private int account;
}
package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class OrderInfo {
 private int id;
 private int order_status;
 private int order_amount;
}

Mapper代码

package com.icodingedu.mapper.db195;
​
import com.icodingedu.pojo.PayMsg;
import org.springframework.stereotype.Repository;
​
import java.util.List;
​
@Repository
public interface PayMsgMapper {
 int insertPayMs(PayMsg payMsg);
 int updatePayMsg(PayMsg payMsg);
 List<PayMsg> queryNoSend();
 PayMsg queryForId(int order_id);
}
package com.icodingedu.mapper.db195;
​
import com.icodingedu.pojo.UserAccount;
import org.springframework.stereotype.Repository;
​
@Repository
public interface UserAccountMapper {
 int updateUserAccount(UserAccount userAccount);
 UserAccount queryForId(int id);
}
package com.icodingedu.mapper.db197;
​
import com.icodingedu.pojo.OrderInfo;
import org.springframework.stereotype.Repository;
​
@Repository
public interface OrderInfoMapper {
 int updateOrderInfo(OrderInfo orderInfo);
 OrderInfo queryForId(int id);
}

xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db195.PayMsgMapper">
 <insert id="insertPayMs" parameterType="com.icodingedu.pojo.PayMsg">
 insert into pay_msg(id,order_id,status,fail_count) values(#{id},#{order_id},#{status},#{fail_count})
 </insert>
 <update id="updatePayMsg" parameterType="com.icodingedu.pojo.PayMsg">
 update pay_msg set status=#{status},fail_count=#{fail_count} where id=#{id}
 </update>
 <select id="queryNoSend" resultType="com.icodingedu.pojo.PayMsg">
 select * from pay_msg where status=0
 </select>
 <select id="queryForId" resultType="com.icodingedu.pojo.PayMsg">
 select * from pay_msg where order_id=#{order_id}
 </select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db195.UserAccountMapper">
 <update id="updateUserAccount" parameterType="com.icodingedu.pojo.UserAccount">
 update user_account set account=#{account} where id=#{id}
 </update>
 <select id="queryForId" resultType="com.icodingedu.pojo.UserAccount">
 select * from user_account where id=#{id}
 </select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db197.OrderInfoMapper">
 <update id="updateOrderInfo" parameterType="com.icodingedu.pojo.OrderInfo">
 update order_info set order_status=#{order_status},order_amount=#{order_amount} where id=#{id}
 </update>
 <select id="queryForId" resultType="com.icodingedu.pojo.OrderInfo">
 select * from order_info where id=#{id}
 </select>
</mapper>

service

package com.icodingedu.service;
​
import com.icodingedu.mapper.db197.OrderInfoMapper;
import com.icodingedu.pojo.OrderInfo;
import com.mysql.cj.x.protobuf.MysqlxCrud;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class OrderService {
 @Autowired
 OrderInfoMapper orderInfoMapper;
​
 /**
 *
 * @param order_id
 * @return:0-更新成功,1-订单不存在
 */
 public int handleOrder(int order_id){
 OrderInfo orderInfo = orderInfoMapper.queryForId(order_id);
 if(orderInfo==null){
 return 1;
 }
 orderInfo.setOrder_status(1);
 orderInfoMapper.updateOrderInfo(orderInfo);
 return 0;
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db195.PayMsgMapper;
import com.icodingedu.pojo.PayMsg;
import jdk.internal.dynalink.linker.LinkerServices;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
​
import java.util.List;
​
@Service
public class OrderNotifyService {
​
 @Autowired
 PayMsgMapper payMsgMapper;
​
 @Scheduled(cron = "0/5 * * * * ?")
 public void orderNotify() throws Exception{
 System.out.println("进入cron************");
 List<PayMsg> payMsgList = payMsgMapper.queryNoSend();
 if(payMsgList==null||payMsgList.size()==0){
 return;
 }
 for (PayMsg payMsg: payMsgList) {
 int order_id = payMsg.getOrder_id();
 //http://localhost:8080/handleorder?id=2001
 CloseableHttpClient httpClient = HttpClientBuilder.create().build();
 HttpGet httpGet = new HttpGet("http://localhost:8080/handleorder?id="+order_id);
 CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
 String response = EntityUtils.toString(httpResponse.getEntity());
 System.out.println("************调用结果:"+response);
 if("success".equals(response)){
 payMsg.setStatus(1);
 }else{
 int count = payMsg.getFail_count();
 payMsg.setFail_count(count+1);
 if(count+1>5){
 payMsg.setStatus(2);
 }
 }
 payMsgMapper.updatePayMsg(payMsg);
 }
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db195.PayMsgMapper;
import com.icodingedu.mapper.db195.UserAccountMapper;
import com.icodingedu.pojo.PayMsg;
import com.icodingedu.pojo.UserAccount;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
​
@Service
public class PaymentService {
​
 @Autowired
 UserAccountMapper userAccountMapper;
​
 @Autowired
 PayMsgMapper payMsgMapper;
 /**
 *
 * @param uid
 * @param order_id
 * @param amount
 * @return:0-成功,1-用户不存在,2-余额不足
 */
 @Transactional(transactionManager = "tm195")
 public int payment(int uid,int order_id,int amount){
 UserAccount userAccount = userAccountMapper.queryForId(uid);
 if(userAccount==null){
 return 1;
 }
 int account = userAccount.getAccount();
 if(account<amount){
 return 2;
 }
 userAccount.setAccount(account-amount);
 userAccountMapper.updateUserAccount(userAccount);
​
 PayMsg payMsg = new PayMsg();
 payMsg.setId(1001);
 payMsg.setOrder_id(order_id);
 payMsg.setStatus(0);//0-未发送,1-发送成功,2-超次数
 payMsg.setFail_count(0);
 payMsgMapper.insertPayMs(payMsg);
 return 0;
 }
}

controller

package com.icodingedu.controller;
​
import com.icodingedu.service.MsgSendService;
import com.icodingedu.service.PaymentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
​
@Controller
public class PaymentController {
 @Autowired
 PaymentService paymentService;
​
 @GetMapping("/payment")
 @ResponseBody
 public String payment(int uid,int orderid,int amount){
 int status = paymentService.payment(uid,orderid,amount);
 return "支付成功: "+status;
 }
}
package com.icodingedu.controller;
​
import com.icodingedu.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
​
@Controller
public class OrderController {
​
 @Autowired
 OrderService orderService;
​
 @GetMapping("/handleorder")
 @ResponseBody
 public String orderSet(int id){
 try {
 int flag = orderService.handleOrder(id);
 if (flag == 0) {
 return "success";
 } else {
 return "fail";
 }
 }catch (Exception ex){
 ex.printStackTrace();
 return "fail";
 }
 }
}

Application

package com.icodingedu;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
​
@SpringBootApplication
@EnableScheduling
public class TccTransactionApplication {
​
 public static void main(String[] args) {
 SpringApplication.run(TccTransactionApplication.class, args);
 }
​
}

13. 基于消息队列MQ的最终一致性方案设计

  • 原理和流程类似我们的本本地消息

  • 不同点

    • 本地消息表改为MQ

    • 定时任务的职责直接由MQ的消费者来担任了

image-20200322213147932.png

基于MQ实现最终一致性的问题分析

  • 不依赖于定时任务的周期验证,基于MQ更高效、更可靠

  • 适合自己企业内部系统调用

  • 不同企业的系统之间无法基于MQ,本地消息更适合(第三方应用一般都是通过网络访问)

具体逻辑实现

  • 扣减余额后就发送订单更新消息通知给MQ

  • 消费端接收消息成功后更新订单状态并返回ACK给MQ

  • 如果消费端消费失败则NACK重回队列(回到队首)并记录消费失败次数,超过这个次数就ACK到MQ,然后人工在消息记录里进行补偿

image-20200323001802383.png

14. 基于消息队列MQ的最终一致性代码实现

POM依赖

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

配置发送和接收

spring:
 rabbitmq:
 host: 39.100.17.31
 port: 5672
 username: guest
 password: guest
 virtual-host: /
 connection-timeout: 10000
 listener:
 simple:
 concurrency: 1
 max-concurrency: 1
 auto-startup: true
 prefetch: 1
 acknowledge-mode: manual

POJO

package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class ReceiveMsg {
 private int id;
 private int order_id;
 private int fail_count;
}

Mapper

package com.icodingedu.mapper.db197;
​
import com.icodingedu.pojo.ReceiveMsg;
import org.springframework.stereotype.Repository;
​
@Repository
public interface ReceiveMsgMapper {
 int insert(ReceiveMsg receiveMsg);
 int update(ReceiveMsg receiveMsg);
 ReceiveMsg queryForOrderId(int order_id);
}

xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db197.ReceiveMsgMapper">
 <insert id="insert" parameterType="com.icodingedu.pojo.ReceiveMsg">
 insert into receive_msg(id,order_id,fail_count) values(#{id},#{order_id},#{fail_count})
 </insert>
 <update id="update" parameterType="com.icodingedu.pojo.ReceiveMsg">
 update receive_msg set fail_count=#{fail_count} where id=#{id}
 </update>
 <select id="queryForOrderId" resultType="com.icodingedu.pojo.ReceiveMsg">
 select * from receive_msg where order_id=#{order_id}
 </select>
</mapper>

controller

package com.icodingedu.controller;
​
import com.icodingedu.service.MsgSendService;
import com.icodingedu.service.PaymentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
​
@Controller
public class PaymentController {
 @Autowired
 PaymentService paymentService;
​
 @Autowired
 MsgSendService msgSendService;
​
 @GetMapping("/payment")
 @ResponseBody
 public String payment(int uid,int orderid,int amount){
 int status = paymentService.payment(uid,orderid,amount);
 msgSendService.sendMessage("msg1001",String.valueOf(orderid));
 return "支付成功: "+status;
 }
}

service

package com.icodingedu.service;
​
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class MsgSendService {
 @Autowired
 RabbitTemplate rabbitTemplate;
​
 public void sendMessage(String mid,String msg){
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(mid);
 rabbitTemplate.convertAndSend("order-exchange","receive.info",msg,correlationData);
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db197.ReceiveMsgMapper;
import com.icodingedu.pojo.ReceiveMsg;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.Map;
​
@Service
public class OrderReceiveService {
​
 @Autowired
 OrderService orderService;
​
 @Autowired
 ReceiveSendService receiveSendService;
​
 @Autowired
 ReceiveMsgMapper receiveMsgMapper;
​
 @RabbitListener(queues = "receive-queue")
 @RabbitHandler
 public void onOrderMessage(@Payload String orderid, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println("*********消息收到,开始消费*********");
 System.out.println("OrderID :"+orderid);
 Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
​
 try{
 //订单状态发生了改变
 int flag = orderService.handleOrder(Integer.valueOf(orderid));
 if(flag==0){
 channel.basicAck(deliverTag,false);
 //通知发送端我接收到消息了
 receiveSendService.sendMessage("send1001",orderid);
 }else{
 ReceiveMsg receiveMsg = receiveMsgMapper.queryForOrderId(Integer.valueOf(orderid));
 if(receiveMsg==null){
 receiveMsg = new ReceiveMsg();
 receiveMsg.setId(3001);
 receiveMsg.setFail_count(1);
 receiveMsg.setOrder_id(Integer.valueOf(orderid));
 receiveMsgMapper.insert(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }else{
 int fail_count = receiveMsg.getFail_count();
 if(fail_count>5){
 channel.basicAck(deliverTag,false);
 }else{
 receiveMsg.setFail_count(fail_count+1);
 receiveMsgMapper.update(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }
 }
 }
 }catch (Exception ex){
 ex.printStackTrace();
 ReceiveMsg receiveMsg = receiveMsgMapper.queryForOrderId(Integer.valueOf(orderid));
 if(receiveMsg==null){
 receiveMsg = new ReceiveMsg();
 receiveMsg.setId(3001);
 receiveMsg.setFail_count(1);
 receiveMsg.setOrder_id(Integer.valueOf(orderid));
 receiveMsgMapper.insert(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }else{
 int fail_count = receiveMsg.getFail_count();
 if(fail_count>5){
 channel.basicAck(deliverTag,false);
 }else{
 receiveMsg.setFail_count(fail_count+1);
 receiveMsgMapper.update(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }
 }
 }
 }
}
package com.icodingedu.service;
​
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class ReceiveSendService {
​
 @Autowired
 RabbitTemplate rabbitTemplate;
​
 public void sendMessage(String mid,String msg){
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(mid);
 rabbitTemplate.convertAndSend("order-exchange","send.confirm",msg,correlationData);
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db195.PayMsgMapper;
import com.icodingedu.pojo.PayMsg;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
​
import java.util.Map;
​
@Service
public class SendReceiveService {
​
 @Autowired
 PayMsgMapper payMsgMapper;
​
 @RabbitListener(queues = "sendconfirm-queue")
 @RabbitHandler
 public void onOrderMessage(@Payload String orderid, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println("===========接收发送确认的消息============");
 System.out.println("=====orderid "+orderid);
 Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
 PayMsg payMsg = payMsgMapper.queryForId(Integer.valueOf(orderid));
 if(payMsg!=null){
 payMsg.setStatus(1);
 payMsgMapper.updatePayMsg(payMsg);
 channel.basicAck(deliverTag,false);
 }
 }
}

相关文章

  • 微服务分布式事务--破局

    微服务架构下分布式事务设计实战 商品 订单 支付 分布式事务->长事务本地事务->短事务 分布式事务: 比如 下...

  • 分布式事务

    目录 分布式事务解决方案 长事务: saga 短事务: 设计的时候尽量短事务,能不用分布式事务尽量不用,分布式事务...

  • 分布式事务

    分布式事务是分布式容错设计方式,与分布式事务相关的我们可以从理论到相关技术,最后到设计方法。 分布式事务理论 从理...

  • 第二章架构设计之技术实践篇(下)

    本章要点 分布式事务设计与实践 服务降级设计 服务限流/熔断设计 服务灰度发布设计 1.分布式事务设计与实践 分布...

  • RocketMQ-分布式消息

    rocketMQ分布式事务架构设计rocketMQ分布式事务架构.png首先发送消息并异步执行本地事务(发送成功后...

  • 一、分布式事务的应用场景

    现在提起分布式事务中的“事务”,和传统的数据库事务中的“事务”严格意义上已经不是完全等同的了。 设计一个分布式事务...

  • 分布式事务相关文章

    1. 干货 | 一篇文章带你学习分布式事务 ---蚂蚁金服 2. 分布式事务之TCC服务设计和实现注意事项 ---...

  • 分布式事务与分布式锁

    一、分布式事务 什么事分布式事务 分布式事务就是指事务的资源分别位于不同的分布式系统的不同节点之上的事务。 分布式...

  • 分布式事务

    分布式事务 怎么处理分布式事务如果能在设计的时候避免就尽量避免考察业务出错的原因,如果出错的频率比较低,那可可以考...

  • ATOMIKOS+JTA分布式事务记录

    ATOMIKOS+JTA是用来分布式事务的中间件,那么什么是分布式事务呢? 事务,分为单机事务,分布式事务;单机事...

网友评论

      本文标题:分布式事务设计

      本文链接:https://www.haomeiwen.com/subject/jvadbktx.html