美文网首页
分布式事务解决方案之2PC(二)

分布式事务解决方案之2PC(二)

作者: 倚仗听江 | 来源:发表于2021-01-13 13:47 被阅读0次

    上一篇文章对2PC及其解决方案做了一些介绍,这次就通过Seata来实现一下2PC事务

    本示例通过Seata中间件实现分布式事务,模拟三个账户的转账交易过程。
    两个账户在三个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个个微服务。交易过程是,张三给李四转账指定金额。

    bank1库,包含张三账户

    CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'
    DROP TABLE IF EXISTS `account_info`;
    CREATE TABLE `account_info` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
    主姓名',
    `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
    卡号',
    `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
    '帐户密码',
    `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
    Dynamic;
    INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);
    

    bank2库,包含李四账户

    CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    CREATE TABLE `account_info` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
    主姓名',
    `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
    卡号',
    `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
    '帐户密码',
    `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
    Dynamic;
    INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);
    

    undo_log是记录undo和redo日志的,在第二阶段进行异常的时候进行回滚

    CREATE TABLE `undo_log` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `branch_id` bigint(20) NOT NULL,
    `xid` varchar(100) NOT NULL,
    `context` varchar(128) NOT NULL,
    `rollback_info` longblob NOT NULL,
    `log_status` int(11) NOT NULL,
    `log_created` datetime NOT NULL,
    `log_modified` datetime NOT NULL,
    `ext` varchar(100) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    

    1. 启动TC(事务协调器)

    下载seata服务器后,在命令行中执行/bin/seata-server.bat -p 8888 -m file
    注:其中8888为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。

    2. 创建注册中心

    application.yml

    spring:
        application:
            name: seata-demo-discovery
    
    
    server:
        port: 56080 #启动端口
    
    
    
    eureka:
      server:
        enable-self-preservation: false    #关闭服务器自我保护,客户端心跳检测15分钟内错误达到80%服务会保护,导致别人还认为是好用的服务
        eviction-interval-timer-in-ms: 10000 #清理间隔(单位毫秒,默认是60*1000)5秒将客户端剔除的服务在服务注册列表中剔除# 
        shouldUseReadOnlyResponseCache: true #eureka是CAP理论种基于AP策略,为了保证强一致性关闭此切换CP 默认不关闭 false关闭
        response-cache-update-interval-ms: 3000  ##eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上 #eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上默认30s
        response-cache-auto-expiration-in-seconds: 180   ##eureka server缓存readWriteCacheMap失效时间,这个只有在这个时间过去后缓存才会失效,失效前不会更新,过期后从registry重新读取注册服务信息,registry是一个ConcurrentHashMap。
      client: 
        register-with-eureka: false  #false:不作为一个客户端注册到注册中心
        fetch-registry: false      #为true时,可以启动,但报异常:Cannot execute request on any known server
        instance-info-replication-interval-seconds: 10 
        serviceUrl: 
          defaultZone: http://localhost:${server.port}/eureka/
      instance:
        hostname: ${spring.cloud.client.ip-address}
        prefer-ip-address: true
        instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
        lease-renewal-interval-in-seconds: 5    ## 续约更新时间间隔(默认30秒)
        lease-expiration-duration-in-seconds: 10 # 续约到期时间(默认90秒)
        
        
        
    logging: 
      config: classpath:log4j2-dev.xml
    
    @SpringBootApplication
    @EnableEurekaServer
    public class DiscoveryServer {
    
        public static void main(String[] args) {
            SpringApplication.run(DiscoveryServer.class, args);
    
        }
    }
    

    3. dtx-seata-demo-bank1

    dtx-seata-demo-bank1实现如下功能:

    1. 张三账户减少金额,开启全局事务
    2. 远程调用bank2向李四转账。
      2.1 DAO
    @Mapper
    @Component
    public interface AccountInfoDao {
    //更新账户金额
    @Update("update account_info set account_balance = account_balance + #{amount} where
    account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double
    amount);
    }
    

    2.2 FeignClient
    远程调用bank2的客户端

    @FeignClient(value = "seata‐demo‐bank2",fallback = Bank2ClientFallback.class)
    public interface Bank2Client {
    @GetMapping("/bank2/transfer")
    String transfer(@RequestParam("amount") Double amount);
    }
    

    失败返回

    @Component
    public class Bank2ClientFallback implements Bank2Client{
      @Override
      public String transfer(Double amount) {
        return "fallback";
      }
    }
    

    2.3 Service

    @Service
    public class AccountInfoServiceImpl implements AccountInfoService {
    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
      @Autowired
      AccountInfoDao accountInfoDao;
      @Autowired
      Bank2Client bank2Client;
      //张三转账
      @Override
      @GlobalTransactional
      @Transactional
      public void updateAccountBalance(String accountNo, Double amount) {
        logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
        //张三扣减金额
        accountInfoDao.updateAccountBalance(accountNo,amount*‐1);
        //向李四转账
        String remoteRst = bank2Client.transfer(amount);
        //远程调用失败
        if(remoteRst.equals("fallback")){
            throw new RuntimeException("bank1 下游服务异常");
          } 
        //人为制造错误
        if(amount==3){
            throw new RuntimeException("bank1 make exception 3");
        }
      }
    }
    

    将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务:GlobalTransactionalInterceptor会拦截@GlobalTransactional注解的方法,生成全局事务ID(XID),XID会在整个分布式事务中传递。在远程调用时,spring-cloud-alibaba-seata会拦截Feign调用将XID传递到下游服务。

    2.4 Controller

    @RestController
    public class Bank1Controller {
        @Autowired
        AccountInfoService accountInfoService;
        //转账
        @GetMapping("/transfer")
        public String transfer(Double amount){
            accountInfoService.updateAccountBalance("1",amount);
        return "bank1"+amount;
        }
    }
    
    1. dtx-seata-demo-bank2
      dtx-seata-demo-bank2实现如下功能:
      1、李四账户增加金额。
      dtx-seata-demo-bank2在本账号事务中作为分支事务不使用@GlobalTransactional。

    3.1 DAO

    @Mapper
    @Component
    public interface AccountInfoDao {
        //向李四转账
        @Update("UPDATE account_info SET account_balance = account_balance + #   {amount} WHERE  account_no = #{accountNo}")
        int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    }
    

    3.2 Service

    @Service
    public class AccountInfoServiceImpl implements AccountInfoService {
        private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
        @Autowired
        AccountInfoDao accountInfoDao;
        @Override
        @Transactional
        public void updateAccountBalance(String accountNo, Double amount) {
            logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
            //李四增加金额
            accountInfoDao.updateAccountBalance(accountNo,amount);
            //制造异常
            if(amount==2){
                throw new RuntimeException("bank1 make exception 2");
            }
        }
    }
    

    3.3 Controller

    @RestController
    public class Bank2Controller {
        @Autowired
        AccountInfoService accountInfoService;
        @GetMapping("/transfer")
        public String transfer(Double amount){
            accountInfoService.updateAccountBalance("2",amount);
        return "bank2"+amount;
      }
    }
    

    相关文章

      网友评论

          本文标题:分布式事务解决方案之2PC(二)

          本文链接:https://www.haomeiwen.com/subject/gcteaktx.html