近来心情很复杂,又忙于各种生计,所以一直没有更新。但觉得没有很干货的内容,干嘛要更新,很多公众号作者是不是就打广告,硬要每天一更,不管什么内容都往里塞,不摇碧莲。
至于什么是分布式事务,什么是ACID,什么是Base理论,什么是Cap定理,什么是2PC/3PC这里不说了,这些概念总是看一次忘记一次,时间一长就糊糊了。
一、环境要求
- JDK 1.8
- Maven
- Docker
- springboot 2.1.4
- springcloud eureka
二、架构图
3.png
三、创建saga数据库
CREATE DATABASE IF NOT EXISTS saga;
use saga;
CREATE TABLE IF NOT EXISTS TxEvent (
surrogateId bigint NOT NULL AUTO_INCREMENT,
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
type varchar(50) NOT NULL,
compensationMethod varchar(512) NOT NULL,
expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
payloads blob,
retries int(11) NOT NULL DEFAULT '0',
retryMethod varchar(512) DEFAULT NULL,
PRIMARY KEY (surrogateId),
INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime),
INDEX saga_global_tx_index (globalTxId)
) DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS Command (
surrogateId bigint NOT NULL AUTO_INCREMENT,
eventId bigint NOT NULL UNIQUE,
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
compensationMethod varchar(512) NOT NULL,
payloads blob,
status varchar(12),
lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
version bigint NOT NULL,
PRIMARY KEY (surrogateId),
INDEX saga_commands_index (surrogateId, eventId, globalTxId, localTxId, status)
) DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS TxTimeout (
surrogateId bigint NOT NULL AUTO_INCREMENT,
eventId bigint NOT NULL UNIQUE,
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
type varchar(50) NOT NULL,
expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
status varchar(12),
version bigint NOT NULL,
PRIMARY KEY (surrogateId),
INDEX saga_timeouts_index (surrogateId, expiryTime, globalTxId, localTxId, status)
) DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS tcc_global_tx_event (
surrogateId bigint NOT NULL AUTO_INCREMENT,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
txType varchar(12),
status varchar(12),
creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (surrogateId),
UNIQUE INDEX tcc_global_tx_event_index (globalTxId, localTxId, parentTxId, txType)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS tcc_participate_event (
surrogateId bigint NOT NULL AUTO_INCREMENT,
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
confirmMethod varchar(512) NOT NULL,
cancelMethod varchar(512) NOT NULL,
status varchar(50) NOT NULL,
creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (surrogateId),
UNIQUE INDEX tcc_participate_event_index (globalTxId, localTxId, parentTxId)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS tcc_tx_event (
surrogateId bigint NOT NULL AUTO_INCREMENT,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
methodInfo varchar(512) NOT NULL,
txType varchar(12),
status varchar(12),
creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (surrogateId),
UNIQUE INDEX tcc_tx_event_index (globalTxId, localTxId, parentTxId, txType)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS master_lock (
serviceName varchar(36) not NULL,
expireTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
lockedTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
instanceId varchar(255) not NULL,
PRIMARY KEY (serviceName)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
四、获取取源码进行编译
- 在github上获取源码
https://github.com/apache/servicecomb-pack.git
cd servicecomb-pack
- 编译,为了用eureka注册中心,mvn打包时需要加上-Pspring-cloud-eureka, 同时将其编译为docker镜像
mvn clean install -DskipTests -Pdocker -Pspring-cloud-eureka
-
编译成功后docker images看一下
1.png
☁ saga [master] ⚡ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
pack-web 0.6.0-SNAPSHOT 64a3ae350741 2 days ago 148MB
alpha-server 0.6.0-SNAPSHOT d87009adb609 2 days ago 245MB
<none> <none> bb1c5e40b598 3 days ago 245MB
phpstorm_helpers PS-192.7142.51 15846b417255 7 days ago 1.34MB
redis latest f9b990972689 12 days ago 104MB
swoft/swoft latest de9b89f8617c 13 days ago 546MB
rabbitmq management fa535c4b51fe 2 weeks ago 181MB
mysql latest a7a67c95e831 2 weeks ago 541MB
postgres latest 0f10374e5170 2 weeks ago 314MB
idoop/docker-apollo latest d81a0e918b66 3 weeks ago 268MB
busybox latest be5888e67be6 4 weeks ago 1.22MB
hub.yuyuda.com:8888/swoft/my-swoft latest 7c74778773f0 4 weeks ago 561MB
hub.yuyuda.com:8888/php/my-php latest b20483c2a2b7 6 weeks ago 564MB
hub.yuyuda.com:8888/nginx/my-nginx latest bcccbce3dd42 6 weeks ago 127MB
percona/pmm-server latest ada7dbc5a8bc 3 months ago 1.15GB
openjdk 8-jre-alpine f7a292bbb70c 12 months ago 84.9MB
☁ saga [master] ⚡
注:有可能一次不能下载完,多试几次。
五、启动saga服务
- docker方式启动alpha-server
docker stop my-alpha
docker rm my-alpha
docker run -d \
--name my-alpha \
--restart always \
-p 8080:8080 \
-p 8090:8090 \
-e "JAVA_OPTS=-Dspring.profiles.active=mysql -Dspring.datasource.url=jdbc:mysql://10.99.50.182:3666/saga?useSSL=false -Dspring.datasource.username=root -Dspring.datasource.password=123456 -Deureka.client.enabled=true -Deureka.client.service-url.defaultZone=http://10.99.50.182:8761/eureka" \
alpha-server:0.6.0-SNAPSHOT
- java方式启动
# 在目录/home/xianzi/java/servicecomb-pack/alpha/alpha-server/target/saga
java -jar alpha-server-0.6.0-SNAPSHOT-exec.jar \
--server.port=8090 \
--alpha.server.port=8080 \
--spring.datasource.url="jdbc:mysql://10.99.50.182:3666/saga?useSSL=false" \
--spring.datasource.username=root \
--spring.datasource.password=123456 \
--eureka.client.enabled=true \
--eureka.client.service-url.defaultZone=http://10.99.50.182:8761/eureka \
--spring.profiles.active=mysql
注:eureka服务自己搞定,很简单的
六、omega端
- pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.yuyuda.saga</groupId>
<artifactId>saga</artifactId>
<version>1.0</version>
<name>saga</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
<pack.version>0.5.0</pack.version>
<mybatis-pulus.version>3.1.2</mybatis-pulus.version>
<alibaba-druid.version>1.1.18</alibaba-druid.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<!--servicecomb-pack omega依赖-->
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
<artifactId>omega-spring-starter</artifactId>
<version>${pack.version}</version>
<exclusions>
<exclusion>
<artifactId>objenesis</artifactId>
<groupId>org.objenesis</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
<artifactId>omega-spring-cloud-eureka-starter</artifactId>
<version>${pack.version}</version>
</dependency>
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
<artifactId>omega-transport-resttemplate</artifactId>
<version>${pack.version}</version>
</dependency>
<!--servicecomb-pack omega依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.50</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-pulus.version}</version>
</dependency>
<!--阿里巴巴连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${alibaba-druid.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
2、注意事项
# com.google.guava、org.objenesis存在重复依赖,需要将这些 依赖排除掉
3、yaml配置,我用的是spring-cloud-config配置中心,在gitlab上
3.1、saga-dev.yml
server:
port: 8092
spring:
application:
name: saga
#数据库配置
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${MYSQL_HOST:10.99.50.182}:${MYSQL_PORT:3666}/${MYSQL_DATABASE:saga}?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false
username: ${MYSQL_USERNAME:root}
password: ${MYSQL_PASSWORD:123456}
type: com.alibaba.druid.pool.DruidDataSource
#最大活跃数
maxActive: 20
#初始化数量
initialSize: 1
#最大连接等待超时时间
maxWait: 60000
#打开PSCache,并且指定每个连接PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
#通过connectionProperties属性来打开mergeSql功能;慢SQL记录
#connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
minIdle: 1
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
#配置监控统计拦截的filters,去掉后监控界面sql将无法统计,'wall'用于防火墙 , 通过http://localhost:8081/druid/index.html 访问druid管理界面
# 'wall' 和 flyway 冲突无法执行sql脚本,关闭防火墙
filters: stat
alpha:
cluster:
register:
type: eureka
address: 10.99.50.182:8080
env:
dev
mybatis-plus:
# 如果是放在src/main/java目录下 classpath:/com/yourpackage/*/mapper/*Mapper.xml
# 如果是放在resource目录 classpath:/mapper/*Mapper.xml
mapper-locations: classpath:/mapper/*Mapper.xml
#实体扫描,多个package用逗号或者分号分隔
typeAliasesPackage: com.yuyuda.saga.entity
banner: false
3.2、bootstrap.yml
spring:
application:
name: saga
cloud:
config:
discovery:
enabled: true
service-id: CONFIG
profile: dev
loadbalancer:
retry:
enabled: true
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
4、测试实战
4.1、建立测试数据表
CREATE TABLE `testa` (
`id` int NOT NULL AUTO_INCREMENT,
`service` varchar(128) DEFAULT NULL,
`vstatus` varchar(128) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `testb` (
`id` int NOT NULL AUTO_INCREMENT,
`service` varchar(128) DEFAULT NULL,
`vstatus` varchar(128) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `testc` (
`id` int NOT NULL AUTO_INCREMENT,
`service` varchar(128) DEFAULT NULL,
`vstatus` varchar(128) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
4.2、注解说明
-
SagaStart注解:saga事务开始
-
Compensable注解:补偿注解,确定补偿方法
-
Transactional注解:本地事务注解
4.3、代码演示
i. Controller
package com.yuyuda.saga.controller;
import com.yuyuda.saga.entity.Testa;
import com.yuyuda.saga.entity.Testb;
import com.yuyuda.saga.service.a.AService;
import com.yuyuda.saga.service.b.BService;
import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
@Controller
public class AController {
@Autowired
private AService aService;
@Autowired
private BService bService;
@SagaStart
@PostMapping("/saga_start")
public String test() throws Exception {
Testa testa = aService.insertA(new Testa());
if (null != testa) {
bService.insertB(new Testb());
} else {
throw new Exception("异常");
}
System.out.println("测试一啥");
return "abc";
}
}
ii.AService
package com.yuyuda.saga.service.a;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yuyuda.saga.entity.Testa;
import com.yuyuda.saga.mapper.TestaMapper;
import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class AService {
@Autowired
private TestaMapper testaMapper;
@Compensable(compensationMethod = "cancel")
@Transactional(rollbackFor = Exception.class)
public Testa insertA(Testa testa) {
testa.setService("service a");
testa.setVstatus("1");
int insert = testaMapper.insert(testa);
if (insert > 0) {
return testa;
}
return null;
}
public void cancel(Testa testa) {
QueryWrapper<Testa> wrapper = new QueryWrapper<>();
wrapper.eq("service", "service a");
testaMapper.delete(wrapper);
}
}
iii.BService
package com.yuyuda.saga.service.b;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yuyuda.saga.entity.Testb;
import com.yuyuda.saga.mapper.TestbMapper;
import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class BService {
@Autowired
private TestbMapper testbMapper;
@Compensable(compensationMethod = "cancel")
@Transactional(rollbackFor = Exception.class)
public Testb insertB(Testb testb) throws Exception {
testb.setService("service b");
testb.setVstatus("1");
int insert = testbMapper.insert(testb);
if (insert > 0) {
return testb;
}
return null;
}
public void cancel(Testb testb) {
QueryWrapper<Testb> wrapper = new QueryWrapper<>();
wrapper.eq("service", "service b");
testbMapper.delete(wrapper);
}
}
iv: 模拟异常情况
package com.yuyuda.saga.service.b;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yuyuda.saga.entity.Testb;
import com.yuyuda.saga.mapper.TestbMapper;
import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class BService {
@Autowired
private TestbMapper testbMapper;
@Compensable(compensationMethod = "cancel")
@Transactional(rollbackFor = Exception.class)
public Testb insertB(Testb testb) throws Exception {
testb.setService("service b");
testb.setVstatus("1");
int insert = testbMapper.insert(testb);
throw new Exception("服务B异常");
}
public void cancel(Testb testb) {
QueryWrapper<Testb> wrapper = new QueryWrapper<>();
wrapper.eq("service", "service b");
testbMapper.delete(wrapper);
}
}
5、在saga数据表中的结果
5.1、正常执行模式,在TxEvent表中写入了如下数据
2.png
i. 成功事件类型
SagaStartedEvent
TxStartedEvent
TxEndedEvent
TxStartedEvent
TxEndedEvent
SagaEndedEvent
ii. 图解
4.png
5.2、异常执行模式,在TxEvent表中写入了如下数据
6.png
i. 异常事件类型
SagaStartedEvent
TxStartedEvent
TxEndedEvent
TxStartedEvent
TxAbortedEvent
TxAbortedEvent
TxCompensatedEvent
ii.图解
5.png
iii. 异常情况还会往Command表中写入数据
7.png
6、测试数据表演示
6.1、正常情况
8.png
6.2、模拟异常后,补偿情况
public void cancel(Testa testa) {
QueryWrapper<Testa> wrapper = new QueryWrapper<>();
wrapper.eq("service", "service a");
testaMapper.delete(wrapper);
}
2020-05-14 14:44:52.538 INFO
28142 --- [ault-executor-0] s.p.o.c.g.s.GrpcCompensateStreamObserver : Received compensate command,
global tx id: eb125a8d-fa97-4cf3-bcf3-fb7fb3b3a595, local tx id: 108afbb3-8f9e-48c5-985d-6f201268226e, compensation method: public void com.yuyuda.saga.service.a.AService.cancel(com.yuyuda.saga.entity.Testa)
2020-05-14 14:44:52.683 INFO
28142 --- [ault-executor-0] o.a.s.p.omega.context.CallbackContext
: Callback transaction with global tx id [eb125a8d-fa97-4cf3-bcf3-fb7fb3b3a595],
local tx id [108afbb3-8f9e-48c5-985d-6f201268226e]
9.png
已按补偿方法删除
清风席卷了满含沧桑的脸盘,却还是剪不去愁容,带来的是形单影只的影子,撵不走积压在心灵上寒酸。孤独、平静、少言寡语、傻乎乎,一直在清除心坎里的债务,但债务长时积累,沾了厚厚的尘埃,刮起来有点痛。追求的路仍在不停的清洗,走出去了好远,发现路没有清洗干净,又回来复盘,又继续走,继续复盘,但终点的光点却越来越小。
网友评论