上一篇文章对2PC及其解决方案做了一些介绍,这次就通过Seata来实现一下2PC事务
本示例通过Seata中间件实现分布式事务,模拟三个账户的转账交易过程。
两个账户在三个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个个微服务。交易过程是,张三给李四转账指定金额。
bank1库,包含张三账户
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);
bank2库,包含李四账户
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);
undo_log是记录undo和redo日志的,在第二阶段进行异常的时候进行回滚
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
1. 启动TC(事务协调器)
下载seata服务器后,在命令行中执行/bin/seata-server.bat -p 8888 -m file
注:其中8888为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。
2. 创建注册中心
application.yml
spring:
application:
name: seata-demo-discovery
server:
port: 56080 #启动端口
eureka:
server:
enable-self-preservation: false #关闭服务器自我保护,客户端心跳检测15分钟内错误达到80%服务会保护,导致别人还认为是好用的服务
eviction-interval-timer-in-ms: 10000 #清理间隔(单位毫秒,默认是60*1000)5秒将客户端剔除的服务在服务注册列表中剔除#
shouldUseReadOnlyResponseCache: true #eureka是CAP理论种基于AP策略,为了保证强一致性关闭此切换CP 默认不关闭 false关闭
response-cache-update-interval-ms: 3000 ##eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上 #eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上默认30s
response-cache-auto-expiration-in-seconds: 180 ##eureka server缓存readWriteCacheMap失效时间,这个只有在这个时间过去后缓存才会失效,失效前不会更新,过期后从registry重新读取注册服务信息,registry是一个ConcurrentHashMap。
client:
register-with-eureka: false #false:不作为一个客户端注册到注册中心
fetch-registry: false #为true时,可以启动,但报异常:Cannot execute request on any known server
instance-info-replication-interval-seconds: 10
serviceUrl:
defaultZone: http://localhost:${server.port}/eureka/
instance:
hostname: ${spring.cloud.client.ip-address}
prefer-ip-address: true
instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
lease-renewal-interval-in-seconds: 5 ## 续约更新时间间隔(默认30秒)
lease-expiration-duration-in-seconds: 10 # 续约到期时间(默认90秒)
logging:
config: classpath:log4j2-dev.xml
@SpringBootApplication
@EnableEurekaServer
public class DiscoveryServer {
public static void main(String[] args) {
SpringApplication.run(DiscoveryServer.class, args);
}
}
3. dtx-seata-demo-bank1
dtx-seata-demo-bank1实现如下功能:
- 张三账户减少金额,开启全局事务。
- 远程调用bank2向李四转账。
2.1 DAO
@Mapper
@Component
public interface AccountInfoDao {
//更新账户金额
@Update("update account_info set account_balance = account_balance + #{amount} where
account_no = #{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double
amount);
}
2.2 FeignClient
远程调用bank2的客户端
@FeignClient(value = "seata‐demo‐bank2",fallback = Bank2ClientFallback.class)
public interface Bank2Client {
@GetMapping("/bank2/transfer")
String transfer(@RequestParam("amount") Double amount);
}
失败返回
@Component
public class Bank2ClientFallback implements Bank2Client{
@Override
public String transfer(Double amount) {
return "fallback";
}
}
2.3 Service
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
Bank2Client bank2Client;
//张三转账
@Override
@GlobalTransactional
@Transactional
public void updateAccountBalance(String accountNo, Double amount) {
logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
//张三扣减金额
accountInfoDao.updateAccountBalance(accountNo,amount*‐1);
//向李四转账
String remoteRst = bank2Client.transfer(amount);
//远程调用失败
if(remoteRst.equals("fallback")){
throw new RuntimeException("bank1 下游服务异常");
}
//人为制造错误
if(amount==3){
throw new RuntimeException("bank1 make exception 3");
}
}
}
将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务:GlobalTransactionalInterceptor会拦截@GlobalTransactional注解的方法,生成全局事务ID(XID),XID会在整个分布式事务中传递。在远程调用时,spring-cloud-alibaba-seata会拦截Feign调用将XID传递到下游服务。
2.4 Controller
@RestController
public class Bank1Controller {
@Autowired
AccountInfoService accountInfoService;
//转账
@GetMapping("/transfer")
public String transfer(Double amount){
accountInfoService.updateAccountBalance("1",amount);
return "bank1"+amount;
}
}
- dtx-seata-demo-bank2
dtx-seata-demo-bank2实现如下功能:
1、李四账户增加金额。
dtx-seata-demo-bank2在本账号事务中作为分支事务不使用@GlobalTransactional。
3.1 DAO
@Mapper
@Component
public interface AccountInfoDao {
//向李四转账
@Update("UPDATE account_info SET account_balance = account_balance + # {amount} WHERE account_no = #{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
}
3.2 Service
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
@Autowired
AccountInfoDao accountInfoDao;
@Override
@Transactional
public void updateAccountBalance(String accountNo, Double amount) {
logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
//李四增加金额
accountInfoDao.updateAccountBalance(accountNo,amount);
//制造异常
if(amount==2){
throw new RuntimeException("bank1 make exception 2");
}
}
}
3.3 Controller
@RestController
public class Bank2Controller {
@Autowired
AccountInfoService accountInfoService;
@GetMapping("/transfer")
public String transfer(Double amount){
accountInfoService.updateAccountBalance("2",amount);
return "bank2"+amount;
}
}
网友评论