网上很多资料或组件都是完全重新实现事务管理,而本文选择改造Spring事务的提交过程,这样在即简化了集成成本,又不会因事务管理代码出现Bug导致系统没有满足事务特性(ACID),即如果在一次微服务的调用过程中所有的Spring的事务都提交或撤销了,整个系统也满足ACID。
说起事务本身就挺麻烦的,在单点应用中事务由数据库来实现,但却是个业务层概念。Spring通过线程存储共享事务对象实现了业务层事务。网上常见的分布式事务协议有2PC、3PC与TCC,其中TCC对业务层侵入性强,对业务设计改动比较大,不符合前置设想。
2PC/3PC协议是资源层协议,比较适用于数据库直接互相沟通事务,我们可以借助于Spring的业务层事务方式实现业务层的分布式事务。3PC第一个阶段要求所有参与者达成一致后再进行事务操作,即在一开始要求所有参与者都已就位,而微服务的链式调用是同步的,参与者是逐步增加的,所以3PC在微服务场景基本不可实现。本文选择实现2PC协议,2PC协议也是异步的,但2PC不要求所有参与者一开始就位,只要在阶段2之前就位即可,同步可以看做是2PC的异步的一种特殊的情况。
一致性与原子性
一致性这个词在各种资料中算是歧义特别大了,原子性反而出奇的统一。
原子性目前有两种意思,一个是多线程中操作是不可分的,另一个是事务中所有的操作要么都成功,要么全都失败。这两种意思语境很少重复,所以提到原子性基本不会有歧义。
一致性有分布式系统的一致性、事务的一致性、一致性Hash等解释,事务的一致性简单来说就是事务前后数据库的完整性约束不能破坏,它是由事务的其他3个特性共同保证的。
分布式系统中的一致性一般指的是CAP理论中的C,它的定义是所有节点的数据是相同的,又有人把这个定义称为一致性状态,而在分布式系统中协商达成一致性状态的协议算法为一致性协议,2PC就是一个一致性协议。
当事务和分布式系统有交集时,一致性这个词就彻底乱套了,甚至和原子性搅在了一起,网上铺天盖地的最终一致性的资料很多其实是事务的最终原子性。
本文中一致性从CAP理论取义为
所有节点的数据是相同的
,英文为"all nodes see the same data at the same time"
。
因为分布式系统天然有事务的隔离性,事务的持久性由数据库保证,因此我们只要在Spring事务基础上实现分布式事务的原子性,就进而实现了分布式事务的一致性。
即,只要在数据库事务的基础上实现分布式事务的原子性,就实现了分布式事务的ACID4个特性。因此在后文中用原子性指代分布式事务的整体4个特性。
因Spring的事务使用导致的事务一致性错误不在本文讨论范围内。
对于2PC协议,一致性状态为所有节点对事务的提交或回滚达成一致,一致性的结果是事务的原子性。因此2PC协议的一致性和分布式事务的一致性可以看做是等价的。
本文实现与2PC协议的差异性
2PC的各个参与者是平等,互相隔离的。但微服务不是,微服务是有调用与被调用关系的,被调用者的异常会天然返回给调用者并自然引起调用者的连锁回滚。
2PC协议要求服务恢复后能回到事务中,但这个能力只有数据库自己有,Spring的事务提交前宕机,数据库会自己回滚。因此本文实现的2PC协议会在任何一个参与者宕机后整体回滚,此外还有在一致提交时,业务宕机也可能导致的原子性失败。
由于无法保证业务宕机恢复后重试事务的上下文与宕机前一致,所以这个问题不能简单的在技术上解决,必须要业务配合。比如订单、仓储业务,当用户支付订单后仓储服务提交事务前宕机,仓储服务恢复后并不确定还有没有货,如果没货,只能在业务中退款。
协调者宕机异常
协调者正常的前提下,参与者宕机,我们可以简单的回滚事务才达成一致性。如果协调者宕机且没有Fail Over,这种情况需要详细分析。
如果协调者在阶段一宕机,由于微服务的同步性,整个调用栈会在某个业务阻塞等待进行阶段一投票,与2PC协议相同。即协调者在阶段一宕机会导致业务阻塞但不会造成不一致。
如果协调者阶段二宕机,此时同步调用已经完成,等待提交的消息。如果等待超时,也会导致参与者回滚。所以本文的实现,在阶段二协调者和参与者任一宕机都可能导致不一致。
协调者
本文采用Redis充当协调者,但Redis主从切换可能导致数据不一致,上一节中讨论阶段一协调者宕机不会造成不一致,我们要在设计时弥补这一可能性。
协调者协议实现
设计的存储结构如下
键 | 数据类型 | 描述 |
---|---|---|
cloud-transaction/事务ID/state | Long | 事务存在标志, 值为0时表示事务异常,后续的参与者自行回滚 值为1表示阶段1,值为2表示阶段2 |
cloud-transaction/事务ID/result | String | 事务的最终结果,是COMMIT还是ROLLBACK |
cloud-transaction/事务ID/notice | String | 协调者向参与者的通知订阅KEY |
cloud-transaction/事务ID/vote | Hash[参与者,状态] | 投票记录, HKEY为参与者ID,HValue为投票状态 |
cloud-transaction/事务ID/ack | Hash[参与者,状态] | 执行记录, HKEY为参与者ID,HValue为执行状态 |
正常流程为:
- 业务的开始方为watchdog,负责创建事务ID,写入state为1,并在微服务之间传递事务ID
- 判断state值是否为1,若是将Spring事务的执行结果写入投票记录,否则自行ROLLBACK
- 调用栈返回至watchdog处,此时可以获取到调用栈是否有异常,同时与投票记录互补
- watchdog写state为2,同时将结果写入result
- watchdog在notice上广播结果,并设置各键值的过期时间
- 各参与者收到通知后执行结果,并写入执行记录
为防范1-3步(阶段1)Redis或参与者异常,在步骤4做如下设定:
- 调用栈或投票记录只要有一个为ROLLBACK,整体ROLLBACK
- state必须为1或键不存在
即如果Redis异常未能恢复或参与者业务异常,调用栈必然异常。反之调用栈无异常,则表明无业务异常并且Redis最终正常(最终一致性的最终),可通过校验state值判断vote值的可信度。当参与者超时后可以从state和result综合判断是否应该提交事务。有的Redis设置Master宕机时,Slave可以读数据,这种设置最终会在各事务超时回滚。
这么多问题这东西还能用么
标准2PC的协调者是没有存储的,宕机再恢复需要从各参与者获取事务数据。本实现用的是Redis,Redis还是挺可靠的,还可以主从顶一顶。如果系统压力没那么大,负载并不高,用起来是没啥问题的。并且本实现还支持事务重用,即当微服务调用兜兜转转又回来的时候,事务是重用的,可以解决隔离性问题。
为什么要造轮子
- 纸上得来终觉浅
- 找工作简历投出去就没影了也很苦恼啊
关键代码
在微服务之间共享事务ID
通过RequestInterceptor为请求微服务之间的调用增加一个http头,这样可以方便的传递事务ID。同时如果事务创建时没有这个http头,那么当前业务就处在微服务栈的栈底
@Component
public class CloudTransactionIdFeignInterceptor implements RequestInterceptor {
private static final String REQUEST_ATTRIBUTE_TRANSACTION_ID = "X-TRANSACTION-ID";
private static final String REQUEST_HEADER_TRANSACTION_ID = "X-TRANSACTION-ID";
public static HttpServletRequest getCurrentRequest() {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.currentRequestAttributes();
return attributes.getRequest();
}
@Override
public void apply(RequestTemplate template) {
try {
HttpServletRequest currentRequest = getCurrentRequest();
Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
if (attribute != null) {
template.header(REQUEST_HEADER_TRANSACTION_ID, attribute.toString());
} else {
String header = currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
if (StringUtils.hasText(header)) {
template.header(REQUEST_HEADER_TRANSACTION_ID, header);
}
}
} catch (Throwable e) {
//不能影响正常的流程运行
}
}
//获取事务ID
public static String getCloudTransactionId() {
HttpServletRequest currentRequest = getCurrentRequest();
Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
if (attribute != null)
return attribute.toString();
return currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
}
//广播事务ID
public static void broadcastCloudTransactionId(String id) {
getCurrentRequest().setAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID, id);
}
}
拦截Spring事务的提交
Spring通过PlatformTransactionManager
Bean对象管理事务,我们实现一个Wrapper把Spring的Bean包起来,拦截关键调用。具体代码见类 WrappedDataSourceTransactionManager
。
对Spring 事务的 ThreadLocal数据的处理
private String doMoveThreadData(String id) {
TransactionThreadData data = new TransactionThreadData();
data.resources = new HashMap<>(TransactionSynchronizationManager.getResourceMap());
data.synchronizations = TransactionSynchronizationManager.getSynchronizations();
data.currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
data.currentTransactionReadOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
data.currentTransactionIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
data.actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
for (Object key : data.resources.keySet()) {
TransactionSynchronizationManager.unbindResource(key);
}
TransactionSynchronizationManager.clear();
dataMap.put(id, data);
return id;
}
public boolean restoreThreadData(String id) {
TransactionThreadData data = dataMap.get(id);
dataMap.remove(id);
if (data == null)
return false;
for (Map.Entry<Object, Object> entry : data.resources.entrySet()) {
TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
}
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : data.synchronizations) {
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
TransactionSynchronizationManager.setCurrentTransactionName(data.currentTransactionName);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(data.currentTransactionReadOnly);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(data.currentTransactionIsolationLevel);
TransactionSynchronizationManager.setActualTransactionActive(data.actualTransactionActive);
return true;
}
完整代码见 类 TransactionThreadDataContainer
。
微服务饶了一圈又回来,继续用未提交的事务
Spring事务是基于ThreadLocal的,只要把ThreadLocal搬过来就可以了
public static class LoadBalancerFeignClientWrapper implements Client {
private LoadBalancerFeignClient wrapped;
private WrappedDataSourceTransactionManager transactionManager;
public LoadBalancerFeignClientWrapper(Client delegate,
CachingSpringLoadBalancerFactory lbClientFactory,
SpringClientFactory clientFactory, PlatformTransactionManager transactionManager) {
wrapped = new LoadBalancerFeignClient(delegate, lbClientFactory, clientFactory);
this.transactionManager = (WrappedDataSourceTransactionManager)transactionManager;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
//这里是调用其他微服务,调用的时候把TLS剥离出来
//调用完毕或异常时再恢复回来
//剥离后的数据就可以安放到其他的线程,达到重用事务的目的
transactionManager.stealTransactionThreadData();
Response response = wrapped.execute(request, options);
transactionManager.returnTransactionThreadData();
return response;
} catch (Throwable e) {
transactionManager.returnTransactionThreadData();
throw e;
}
}
}
超时
超时使用的是reactor-core
的Mono
,详细见Reactor 3 Reference Guide
//超时设置
Mono.delay(Duration.ofMillis(maxWaitTime))
.map(t -> onTransactionTimeout(transactionId))
.publishOn(Schedulers.parallel())
.subscribe();
用的不是timeout而是delay,所以onTransactionTimeout
一定会执行,检测事务的结果。
阶段2结果通告
用的时Redis的订阅发布功能。
网友评论