美文网首页Spring Cloud
分布式事务 Seata(四) 多数据源事务

分布式事务 Seata(四) 多数据源事务

作者: _大叔_ | 来源:发表于2020-08-10 17:43 被阅读0次

    项目搭建

    我搭建的是一个最基础的用户下单,减库存,减用户金额,创建订单的一个微服务框架。因为后面需要测试微服务下的分布式事务,这里测试的是 多数据源 下的分布式事务。

    项目结构如下:

    |-- demo
      |-- entity 实体对象(为了让其他服务拥有所有服务对象)
      |-- order 订单 (pom导入了 entity )
      |-- stock 库存 (pom导入了 entity )
      |-- user 用户 (pom导入了 entity )
    

    这里使用了mybatis-plus的相关技术,不懂得请自行百度,数据源配置如下:

    spring:
      main:
        allow-bean-definition-overriding: true
      autoconfigure:
        exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
      datasource:
        druid:
          stat-view-servlet:
            enabled: true
            url-pattern: "/druid/*"
            allow: 127.0.0.1
            deny: 192.168.1.73
            reset-enable: true
            login-username: admin
            login-password: admin@2020
          web-stat-filter:
            enabled: true
            url-pattern: "/*"
            exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
        dynamic:
          druid:
            filters: stat,wall
            initial-size: 10
            min-idle: 10
            maxActive: 200
            maxWait: 10000
            useUnfairLock: true
            validation-query: 'select 1'
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
          primary: user
          datasource:
            user:
              url: jdbc:mysql://0.0.0.0:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
              driver-class-name: com.mysql.cj.jdbc.Driver
              type: com.alibaba.druid.pool.DruidDataSource
              username: dev
              password: mysql@dev.2020
            order:
              url: jdbc:mysql://0.0.0.0:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
              driver-class-name: com.mysql.cj.jdbc.Driver
              type: com.alibaba.druid.pool.DruidDataSource
              username: dev
              password: mysql@dev.2020
            stock:
              url: jdbc:mysql://0.0.0.0:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
              driver-class-name: com.mysql.cj.jdbc.Driver
              type: com.alibaba.druid.pool.DruidDataSource
              username: dev
              password: mysql@dev.2020
    

    表结构创建

    所建的表都不在同一个库下,分为order(表为orders)、user、stock三个库,各自的库建各自的表,每个库都要建一个相同的 undo_log 表,如下:

    /*表: orders*/--------------
    /*列信息*/-----------
    Field   Type      Collation  Null    Key     Default  Extra   Privileges                       Comment  
    ------  --------  ---------  ------  ------  -------  ------  -------------------------------  ---------
    id      int       (NULL)     NO      PRI     (NULL)           select,insert,update,references           
    uid     int       (NULL)     YES             (NULL)           select,insert,update,references           
    count   int       (NULL)     YES             (NULL)           select,insert,update,references           
    time    datetime  (NULL)     YES             (NULL)           select,insert,update,references           
    money   int       (NULL)     YES             (NULL)           select,insert,update,references           
    
    /*表: stock*/--------------
    /*列信息*/-----------
    Field   Type    Collation  Null    Key     Default  Extra   Privileges                       Comment  
    ------  ------  ---------  ------  ------  -------  ------  -------------------------------  ---------
    id      int     (NULL)     NO      PRI     (NULL)           select,insert,update,references           
    num     int     (NULL)     YES             (NULL)           select,insert,update,references           
    price   int     (NULL)     YES             (NULL)           select,insert,update,references           
    
    /*表: account*/----------------
    /*列信息*/-----------
    Field   Type    Collation  Null    Key     Default  Extra           Privileges                       Comment  
    ------  ------  ---------  ------  ------  -------  --------------  -------------------------------  ---------
    id      int     (NULL)     NO      PRI     (NULL)   auto_increment  select,insert,update,references           
    amount  int     (NULL)     YES             (NULL)                   select,insert,update,references    
    

    表数据如下:

    stock: id = 1,num = 500,price = 5
    account: id = 1,amount = 2000
    

    undo_log 表结构

    编写过程

    我把所有的接口和实现均写在 user 服务下进行测试,如下图:

    AccountServer

    public interface AccountServer {
        /**
         * 从胡账户中指支出
        **/
        void debit(Integer uid, Integer money);
    
    }
    
    @Service
    public class AccountServerImpl implements AccountServer {
    
    
        @Autowired
        private AccountMapper accountMapper;
    
        @DS("user")
        @Override
        public void debit(Integer uid, Integer money) {
            Account account = accountMapper.selectById(uid);
            account.setAmount(account.getAmount() - money);
            accountMapper.updateById(account);
        }
    }
    

    StockServer

    public interface StockServer {
        /**
         * 扣除库存数量,返回金额
        **/
        Integer deduct(Integer stockId, Integer count);
    }
    
    @Service
    public class StockServerImpl implements StockServer {
    
        @Autowired
        private StockMapper stockMapper;
    
        // 库存这里需要乐观锁,但是这里我就不做了
        @DS("stock")
        @Override
        public Integer deduct(Integer stockId, Integer count) {
            Stock stock = stockMapper.selectById(stockId);
            stock.setNum(stock.getNum() - count);
            stockMapper.updateById(stock);
            return stock.getPrice().intValue() * count.intValue();
        }
    }
    

    OrderServer

    public interface OrderServer {
    
        /**
         * 创建订单
        **/
        void create(Integer uid, Integer stockId, Integer count,Integer money);
    }
    
    @Service
    public class OrderServerImpl implements OrderServer {
    
        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private AccountServer accountServer;
    
        @DS("order")
        @Override
        public void create(Integer uid, Integer stockId, Integer count,Integer money) {
    
            accountServer.debit(uid,money);
            Order order = new Order();
            order.setId(uid);
            order.setTime(new Date());
            order.setCount(count);
            order.setMoney(money);
    
            orderMapper.insert(order);
        }
    }
    

    BusinessService

    public interface BusinessService {
        /**
         * 采购
        **/
        void purchase(Integer uid, Integer stockId, Integer count);
    }
    
    @Service
    public class BusinessServiceImpl implements BusinessService {
    
        @Autowired
        private StockServer stockServer;
        @Autowired
        private OrderServer orderServer;
    
        @Override
        public void purchase(Integer uid, Integer stockId, Integer count) {
            Integer money = stockServer.deduct(stockId, count);
            orderServer.create(uid, stockId, count,money);
        }
    }
    

    UserApplication

    @SpringBootApplication
    public class UserApplication implements CommandLineRunner {
    
        @Autowired
        private BusinessService businessService;
    
        public static void main(String[] args) {
            SpringApplication.run(UserApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            /**
             * 执行采购,用户id=1,库存id=1,采购数量count=3
            **/
            businessService.purchase(1,1,3);
        }
    }
    

    无seata事务处理测试

    测试加 @Transactional 是否有效

        @Transactional(rollbackFor = Exception.class)
        @Override
        public void purchase(Integer uid, Integer stockId, Integer count) {
            Integer money = stockServer.deduct(stockId, count);
            orderServer.create(uid, stockId, count,money);
        }
    

    测试结果:

    java.sql.SQLSyntaxErrorException: Table 'user.stock' doesn't exist;
    

    什么原因呢?其实就是 @Transactional 的事务传播策略默认为 Propagation.REQUIRED,如果当前没有事务,就新建一个事务,如果已经存在一个事务,就加入到这个事务中。也就是说不同之间的服务调用使用的是同一个库的事务,所以他就查同一个库下的这张表。

    避免这种情况可以在子服务的方法上加 @Transactional(propagation = Propagation.REQUIRES_NEW),新建事务,如果当前存在事务,把当前事务挂起。意思就是,A调B的过程中,A方法用的是A库的事务,B方法用的是B库的事务,相互独立不受影响。

    代码修改如下:

        // 库存这里需要乐观锁,但是这里我就不做了
        @DS("stock")
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public Integer deduct(Integer stockId, Integer count) {
            Stock stock = stockMapper.selectById(stockId);
            stock.setNum(stock.getNum() - count);
            stockMapper.updateById(stock);
            return stock.getPrice().intValue() * count.intValue();
        }
    
        @DS("order")
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public void create(Integer uid, Integer stockId, Integer count,Integer money) {
            accountServer.debit(uid,money);
            Order order = new Order();
            order.setUid(uid);
            order.setTime(new Date());
            order.setCount(count);
            order.setMoney(money);
            if(1==1){
                throw new RuntimeException("下单异常");
            }
            orderMapper.insert(order);
        }
    
        @DS("user")
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public void debit(Integer uid, Integer money) {
            Account account = accountMapper.selectById(uid);
            account.setAmount(account.getAmount() - money);
            accountMapper.updateById(account);
        }
    
    测试下单异常
    @Service
    public class OrderServerImpl implements OrderServer {
    
        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private AccountServer accountServer;
    
        @DS("order")
        @Override
        public void create(Integer uid, Integer stockId, Integer count,Integer money) {
            accountServer.debit(uid,money);
            Order order = new Order();
            order.setUid(uid);
            order.setTime(new Date());
            order.setCount(count);
            order.setMoney(money);
            if(1==1){
                throw new RuntimeException("下单异常");
            }
            orderMapper.insert(order);
        }
    }
    

    测试结果:

    stock:id=1,num=497,price=5
    account:id=1,amount=1985
    order:
    

    用户账户扣除,库存扣除,下单失败。
    也就是说 @Transactional 处理不了分布式事务,只能处理同一个库的事务。

    添加 Seata 分布式事务

    添加依赖
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>io.seata</groupId>
                        <artifactId>seata-spring-boot-starter</artifactId>
                    </exclusion>
                </exclusions>
                <version>2.2.1.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>io.seata</groupId>
                <artifactId>seata-spring-boot-starter</artifactId>
                <version>1.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
                <version>1.3.0</version>
            </dependency>
    
    修改配置
    server:
      port: 8082
    
    spring:
      main:
        allow-bean-definition-overriding: true
      autoconfigure:
        exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
      datasource:
        druid:
          stat-view-servlet:
            enabled: true
            url-pattern: "/druid/*"
            allow: 127.0.0.1
            deny: 192.168.1.73
            reset-enable: true
            login-username: admin
            login-password: admin@2020
          web-stat-filter:
            enabled: true
            url-pattern: "/*"
            exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
        dynamic:
          druid:
            filters: stat,wall
            initial-size: 30
            min-idle: 20
            maxActive: 200
            maxWait: 10000
            useUnfairLock: true
            validation-query: 'select 1'
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
          primary: user
          # 启用严格模式
          strict: true
          # 开启分布式事务
          seata: true
          # 事务模式 为AT
          seata-mode: AT
          datasource:
            user:
              url: jdbc:mysql://10.240.30.100:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
              driver-class-name: com.mysql.cj.jdbc.Driver
              type: com.alibaba.druid.pool.DruidDataSource
              username: dev
              password: mysql@dev.2020
              # 建表脚本,启动时会运行
              # schema: classpath:db/schema-account.sql
            order:
              url: jdbc:mysql://10.240.30.100:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
              driver-class-name: com.mysql.cj.jdbc.Driver
              type: com.alibaba.druid.pool.DruidDataSource
              username: dev
              password: mysql@dev.2020
            stock:
              url: jdbc:mysql://10.240.30.100:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
              driver-class-name: com.mysql.cj.jdbc.Driver
              type: com.alibaba.druid.pool.DruidDataSource
              username: dev
              password: mysql@dev.2020
    
    # 事务配置
    seata:
      enabled: true
      # 启用自动代理数据源
      enable-auto-data-source-proxy: false
      # 随便起个名字,但最好与服务名称一致
      application-id: ${spring.application.name}
      # 此处的名称一定要与 service.vgroupMapping 下配置的参数保持一致
      tx-service-group: my_test_tx_group
      # 目的是从nacos 获取配置信息
      config:
        type: nacos
        nacos:
          server-addr: 127.0.0.1:8848
          username: nacos
          password: nacos@root@2020
          namespace:
          group: SEATA_GROUP
      # registry 目的是从nacos找 seata-server 服务
      registry:
        type: nacos
        nacos:
          application: seata-server
          server-addr: 127.0.0.1:8848
          username: nacos
          password: nacos@root@2020
          namespace:
    

    seata-server 端的config 和 registry 是注册中心 和 配置中心。
    client 配置的 registry 是从naocs所在的注册中心获取seata-server所在的集群或服务,用来连接seata-serve,config 是从naocs 所在config,获取配置。

    关于 nacos 和 seata-server的配置请看 分布式事务 Seata(三) Seata搭建

    添加全局事务
        @Transactional
        @GlobalTransactional(rollbackFor = Exception.class)
        @Override
        public void purchase(Integer uid, Integer stockId, Integer count) {
            Integer money = stockServer.deduct(stockId, count);
            orderServer.create(uid, stockId, count,money);
        }
    
    测试
    stock:id=1,num=500,price=5
    account:id=1,amount=2000
    order:
    

    证明多数据源事务处理成功。

    相关文章

      网友评论

        本文标题:分布式事务 Seata(四) 多数据源事务

        本文链接:https://www.haomeiwen.com/subject/srifdktx.html