美文网首页
Spring Boot 分布式事物管理

Spring Boot 分布式事物管理

作者: 喊我小王吧 | 来源:发表于2020-05-02 23:36 被阅读0次

    Spring Boot 分布式事物管理

    前言

    事务是为了保证数据的一致性而产生的。那么分布式事务,顾名思义,就是我们要保证分布在不同数据库、不同服务器、不同应用之间的数据一致性。

    在单体项目下数据是存放在一个数据库上的,采用数据库的事务就能满足我们的要求。
    但随着业务的不断扩张,数据的不断增加,单一数据库已经到达了一个瓶颈,因此我们需要对数据库进行分库分表。为了保证数据的一致性,可能需要不同的数据库之间的数据要么同时成功,要么同时失败,不然导致产生一些脏数据,导致数据不一致。

    对此,分布式事务就诞生了。

    应用场景

    在分布式系统中,如,支付系统中,支付成功后需要对买家和卖家同时进行操作,买家减钱,卖家加钱。必须得放在一个事务里执行。不然出现扣钱了,没买着货那就尴尬了哈哈哈哈哈,所以分布式事务这时候就堪大用。还有银行转账等等。

    Spring Boot中实现分布式事务

    SpringBoot 集成 Atomikos 实现分布式事务

    Atomikos 是一个为 Java 平台提供增值服务的开源类事务管理器。

    以下是包括在这个开源版本中的一些功能:

    • 全面崩溃 / 重启恢复;
    • 兼容标准的 SUN 公司 JTA API;
    • 嵌套事务;
    • 为 XA 和非 XA 提供内置的 JDBC 适配器。

    注释:XA 协议由 Tuxedo 首先提出的,并交给 X/Open 组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2 和 Sybase 等各大数据库厂家都提供对 XA 的支持。XA 协议采用两阶段提交方式来管理分布式事务。XA 接口提供资源管理器与事务管理器之间进行通信的标准接口。XA 协议包括两套函数,以 xa_ 开头的及以 ax_ 开头的。

    代码实现

    建库

    在本地创建两个数据库:test01,test02,并且创建相同的数据库表:

    sql脚本

    -- ----------------------------
    -- Table structure for test_user1
    -- ----------------------------
    DROP TABLE IF EXISTS `test_user1`;
    CREATE TABLE `test_user1`  (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      `age` int(11) NULL DEFAULT NULL,
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
    
    SET FOREIGN_KEY_CHECKS = 1;
    
    
    
    DROP TABLE IF EXISTS `test_user2`;
    CREATE TABLE `test_user2`  (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      `age` int(11) NULL DEFAULT NULL,
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
    
    SET FOREIGN_KEY_CHECKS = 1;
    
    

    pom中 添加 Atomikos 依赖

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jta-atomikos</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>1.3.5</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.5</version>
            </dependency>
    

    添加数据库配置

    
    server:
      port: 8080
    spring:
      redis:
        host: localhost
        port: 6379
    mysql:
      datasource:
        test1:
          url: jdbc:mysql://localhost:3307/test01?useUnicode=true&characterEncoding=utf-8
          username: root
          password: root
          minPoolSize: 3
          maxPoolSize: 25
          maxLifetime: 20000
          borrowConnectionTimeout: 30
          loginTimeout: 30
          maintenanceInterval: 60
          maxIdleTime: 60
          testQuery: select 1
        test2:
          url: jdbc:mysql://localhost:3307/test02?useUnicode=true&characterEncoding=utf-8
          username: root
          password: root
          minPoolSize: 3
          maxPoolSize: 25
          maxLifetime: 20000
          borrowConnectionTimeout: 30
          loginTimeout: 30
          maintenanceInterval: 60
          maxIdleTime: 60
          testQuery: select 1
    
    

    数据源配置文件读取

    DBConfig1

    package com.example.springbootatomikos.config.pojo;
    
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * @description: 数据源test1
     * @author: Administrator
     * @create: 2020-05-02 19:22
     **/
    @ConfigurationProperties(prefix = "mysql.datasource.test1")
    @SpringBootConfiguration
    public class DBConfig1 {
    
        private String url;
        private String username;
        private String password;
        private int minPoolSize;
        private int maxPoolSize;
        private int maxLifetime;
        private int borrowConnectionTimeout;
        private int loginTimeout;
        private int maintenanceInterval;
        private int maxIdleTime;
        private String testQuery;
    
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public int getMinPoolSize() {
            return minPoolSize;
        }
    
        public void setMinPoolSize(int minPoolSize) {
            this.minPoolSize = minPoolSize;
        }
    
        public int getMaxPoolSize() {
            return maxPoolSize;
        }
    
        public void setMaxPoolSize(int maxPoolSize) {
            this.maxPoolSize = maxPoolSize;
        }
    
        public int getMaxLifetime() {
            return maxLifetime;
        }
    
        public void setMaxLifetime(int maxLifetime) {
            this.maxLifetime = maxLifetime;
        }
    
        public int getBorrowConnectionTimeout() {
            return borrowConnectionTimeout;
        }
    
        public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
            this.borrowConnectionTimeout = borrowConnectionTimeout;
        }
    
        public int getLoginTimeout() {
            return loginTimeout;
        }
    
        public void setLoginTimeout(int loginTimeout) {
            this.loginTimeout = loginTimeout;
        }
    
        public int getMaintenanceInterval() {
            return maintenanceInterval;
        }
    
        public void setMaintenanceInterval(int maintenanceInterval) {
            this.maintenanceInterval = maintenanceInterval;
        }
    
        public int getMaxIdleTime() {
            return maxIdleTime;
        }
    
        public void setMaxIdleTime(int maxIdleTime) {
            this.maxIdleTime = maxIdleTime;
        }
    
        public String getTestQuery() {
            return testQuery;
        }
    
        public void setTestQuery(String testQuery) {
            this.testQuery = testQuery;
        }
    }
    
    

    DBConfig2

    package com.example.springbootatomikos.config.pojo;
    
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * @description: 数据源test2
     * @author: Administrator
     * @create: 2020-05-02 19:22
     **/
    @ConfigurationProperties(prefix = "mysql.datasource.test2")
    @SpringBootConfiguration
    public class DBConfig2 {
    
        private String url;
        private String username;
        private String password;
        private int minPoolSize;
        private int maxPoolSize;
        private int maxLifetime;
        private int borrowConnectionTimeout;
        private int loginTimeout;
        private int maintenanceInterval;
        private int maxIdleTime;
        private String testQuery;
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public int getMinPoolSize() {
            return minPoolSize;
        }
    
        public void setMinPoolSize(int minPoolSize) {
            this.minPoolSize = minPoolSize;
        }
    
        public int getMaxPoolSize() {
            return maxPoolSize;
        }
    
        public void setMaxPoolSize(int maxPoolSize) {
            this.maxPoolSize = maxPoolSize;
        }
    
        public int getMaxLifetime() {
            return maxLifetime;
        }
    
        public void setMaxLifetime(int maxLifetime) {
            this.maxLifetime = maxLifetime;
        }
    
        public int getBorrowConnectionTimeout() {
            return borrowConnectionTimeout;
        }
    
        public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
            this.borrowConnectionTimeout = borrowConnectionTimeout;
        }
    
        public int getLoginTimeout() {
            return loginTimeout;
        }
    
        public void setLoginTimeout(int loginTimeout) {
            this.loginTimeout = loginTimeout;
        }
    
        public int getMaintenanceInterval() {
            return maintenanceInterval;
        }
    
        public void setMaintenanceInterval(int maintenanceInterval) {
            this.maintenanceInterval = maintenanceInterval;
        }
    
        public int getMaxIdleTime() {
            return maxIdleTime;
        }
    
        public void setMaxIdleTime(int maxIdleTime) {
            this.maxIdleTime = maxIdleTime;
        }
    
        public String getTestQuery() {
            return testQuery;
        }
    
        public void setTestQuery(String testQuery) {
            this.testQuery = testQuery;
        }
    }
    
    
    

    数据源配置

    MyBatisConfig1

    package com.example.springbootatomikos.config.one;
    
    import com.example.springbootatomikos.config.pojo.DBConfig1;
    import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Primary;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    
    /**
     * @description: 数据源1配置
     * @author: Administrator
     * @create: 2020-05-02 19:25
     **/
    @SpringBootConfiguration
    @MapperScan(basePackages = "com.example.springbootatomikos.mapper.one", sqlSessionTemplateRef = "sqlSessionTemplate")
    public class MyBatisConfig1 {
    
        // 配置数据源
        @Primary
        @Bean(name = "dataSource")
        public DataSource dataSource(DBConfig1 config) throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(config.getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(config.getPassword());
            mysqlXaDataSource.setUser(config.getUsername());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
            xaDataSource.setUniqueResourceName("dataSource");
    
            xaDataSource.setMinPoolSize(config.getMinPoolSize());
            xaDataSource.setMaxPoolSize(config.getMaxPoolSize());
            xaDataSource.setMaxLifetime(config.getMaxLifetime());
            xaDataSource.setBorrowConnectionTimeout(config.getBorrowConnectionTimeout());
            xaDataSource.setLoginTimeout(config.getLoginTimeout());
            xaDataSource.setMaintenanceInterval(config.getMaintenanceInterval());
            xaDataSource.setMaxIdleTime(config.getMaxIdleTime());
            xaDataSource.setTestQuery(config.getTestQuery());
            return xaDataSource;
        }
        @Primary
        @Bean(name = "sqlSessionFactory")
        public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource)
                throws Exception {
            SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
            bean.setDataSource(dataSource);
            return bean.getObject();
        }
    
        @Primary
        @Bean(name = "sqlSessionTemplate")
        public SqlSessionTemplate sqlSessionTemplate(
                @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
    }
    
    

    项目结构图

    在这里插入图片描述

    MyBatisConfig2

    package com.example.springbootatomikos.config.two;
    
    import com.example.springbootatomikos.config.pojo.DBConfig2;
    import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
    import org.springframework.context.annotation.Bean;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    
    /**
     * @description: 数据源2配置
     * @author: Administrator
     * @create: 2020-05-02 19:26
     **/
    @SpringBootConfiguration
    //basePackages 最好分开配置 如果放在同一个文件夹可能会报错
    @MapperScan(basePackages = "com.example.springbootatomikos.mapper.two", sqlSessionTemplateRef = "sqlSessionTemplate2")
    public class MyBatisConfig2 {
    
        // 配置数据源
        @Bean(name = "dataSource2")
        public DataSource dataSource(DBConfig2 config) throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(config.getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(config.getPassword());
            mysqlXaDataSource.setUser(config.getUsername());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
            xaDataSource.setUniqueResourceName("dataSource2");
    
            xaDataSource.setMinPoolSize(config.getMinPoolSize());
            xaDataSource.setMaxPoolSize(config.getMaxPoolSize());
            xaDataSource.setMaxLifetime(config.getMaxLifetime());
            xaDataSource.setBorrowConnectionTimeout(config.getBorrowConnectionTimeout());
            xaDataSource.setLoginTimeout(config.getLoginTimeout());
            xaDataSource.setMaintenanceInterval(config.getMaintenanceInterval());
            xaDataSource.setMaxIdleTime(config.getMaxIdleTime());
            xaDataSource.setTestQuery(config.getTestQuery());
            return xaDataSource;
        }
    
        @Bean(name = "sqlSessionFactory2")
        public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource2") DataSource dataSource)
                throws Exception {
            SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
            bean.setDataSource(dataSource);
            return bean.getObject();
        }
    
        @Bean(name = "sqlSessionTemplate2")
        public SqlSessionTemplate sqlSessionTemplate(
                @Qualifier("sqlSessionFactory2") SqlSessionFactory sqlSessionFactory) throws Exception {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
    }
    
    

    Mapper配置

    UserMapper1

    @Mapper
    public interface UserMapper1 {
    
        @Insert("insert into test_user1(name,age) values(#{name},#{age})")
        void addUser(@Param("name")String name, @Param("age") int age);
    }
    
    
    

    UserMapper2

    @Mapper
    public interface UserMapper2 {
    
        @Insert("insert into test_user2(name,age) values(#{name},#{age})")
        void addUser(@Param("name") String name, @Param("age") int age);
    }
    
    

    User类

    public class User {
    
        private Long id;
        private String name;
        private int age;
    
    
        public Long getId() {
            return id;
        }
    
        public void setId(Long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getAge() {
            return age;
        }
    
        public void setAge(int age) {
            this.age = age;
        }
    }
    
    
    

    Service类

    @Service
    public class UserService {
    
        @Autowired
        private UserMapper1 userMapper1;
        @Autowired
        private UserMapper2 userMapper2;
    
        @Transactional
        public void addUser(User user)throws Exception{
            userMapper2.addUser(user.getName(),user.getAge());
            int a= 1/0;
            userMapper1.addUser(user.getName(),user.getAge());
        }
    }
    
    

    启动类和controller

    
    @SpringBootApplication
    @RestController
    public class SpringbootatomikosApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringbootatomikosApplication.class, args);
        }
    
        @Autowired
        private UserService userService;
    
        @RequestMapping("test")
        public void test(){
            User user = new User();
            user.setName("test");
            user.setAge(110);
            try {
                userService.addUser(user);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    

    测试

    请求 http://127.0.0.1:8080/test
    后台异常报错,然后查看两个数据库数据都没有入库表示分布式事务执行成功。

    然后将service中的 异常去掉,两个数据库数据入库成功。

    相关文章

      网友评论

          本文标题:Spring Boot 分布式事物管理

          本文链接:https://www.haomeiwen.com/subject/opkrghtx.html