目录
目录.png分布式事务解决方案
- 长事务: saga
- 短事务: 设计的时候尽量短事务,能不用分布式事务尽量不用,分布式事务影响性能,也要求业务端保证补偿接口幂等。比如booking + kafka保证分布式事务,可以用掉booking时插入mysql,如果正常结束给个正常status,如果rpc异常结束,后续可以线程扫描进行补偿以达到最终一致性。
分布式事务应用场景
刚性事务xa
- 适用于需要强一致性的业务,并发性不高,需要数据库支持
- seata的at是柔性事务,第二阶段可以异步,无业务代码入侵,基于sql生成逆向sql,第二阶段可异步
柔性事务
seata的at
- Seata 的 AT 模式基于本地事务的特性,通过拦截并解析 SQL 的方式,记录自定义的回滚日志
- at支持mysql tidb之类的但是redis es这种就不支持, 需要表有主键
- 业务0入侵,如果只是sql的二阶段commit/rollback很适合
最大努力送达
- 实现有本地消息表之类的
- 适合用户注册成功后发送邮件短信、电商系统给用户发送优惠券、不同系统数据同步等需要保证最终一致性的场景(同步加异步)
tcc
- 适合金融核心系统的特点是一致性要求高(业务上的隔离性)、短流程、并发高
saga
- 业务流程长、业务流程多;参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口;典型业务系统:如金融网路(与外部机构对接)、互联网微贷、渠道整合、分布式架构下服务集成等业务系统
saga服务端或者saga客户端重启或者宕机
- saga服务端出问题:
本地消息补偿 - saga客户端宕机或者重启
根据记录状态的消息进行相应补偿,要求客户端调用的被调用方业务接口幂等,补偿业务和saga服务,然后saga服务再进行补偿 - 普通业务场景下客户端宕机或者重启
本地消息补偿,事件开始时就记录,定时任务扫描看状态进行补偿,同样要求被调用方业务接口幂等
基于seata分布式事务 vs servicecomb
- seata的saga需要熟悉状态机那块的写法,servicecomb的saga可以用注解实现saga
分布式事务 servicecomb的saga原理
-
这里使用官方git的demo, booking -> car -> hotel
booking -> car -> hotel.png
最开始调用时先经过sagastart注解
sagastart注解流程.png- SagaStartAspect
@Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
initializeOmegaContext();
if(context.getAlphaMetas().isAkkaEnabled() && sagaStart.timeout()>0){
SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
return wrapper.apply(joinPoint,sagaStart,context);
}else{
SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
return wrapper.apply(joinPoint,sagaStart,context);
}
}
private void initializeOmegaContext() {
context.setLocalTxId(context.newGlobalTxId());
}
- SagaStartAnnotationProcessor
public AlphaResponse preIntercept(int timeout) {
try {
return sender
.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
}
SagaStartAnnotationProcessor远程调用alpha步骤1.2.3
远程调用alpha步骤1.2.3.png远程调用alpha流程图.png
- SagaLoadBalanceSender extends LoadBalanceSenderAdapter
@Override
public AlphaResponse send(TxEvent event) {
do {
final SagaMessageSender messageSender = pickMessageSender();
Optional<AlphaResponse> response = doGrpcSend(messageSender, event, new SenderExecutor<TxEvent>() {
@Override
public AlphaResponse apply(TxEvent event) {
return messageSender.send(event);
}
});
if (response.isPresent()) return response.get();
} while (!Thread.currentThread().isInterrupted());
throw new OmegaException("Failed to send event " + event + " due to interruption");
}
LoadBalanceSenderAdapter
// 轮询做负载
public <T> T pickMessageSender() {
return (T) senderPicker.pick(loadContext.getSenders(),
loadContext.getGrpcOnErrorHandler().getGrpcRetryContext().getDefaultMessageSender());
}
// 统一模板方法,回调子类自己的方法
public <T> Optional<AlphaResponse> doGrpcSend(MessageSender messageSender, T event, SenderExecutor<T> executor) {
AlphaResponse response = null;
try {
long startTime = System.nanoTime();
response = executor.apply(event);
loadContext.getSenders().put(messageSender, System.nanoTime() - startTime);
} catch (OmegaException e) {
throw e;
} catch (Exception e) {
LOG.error("Retry sending event {} due to failure", event, e);
loadContext.getSenders().put(messageSender, Long.MAX_VALUE);
}
return Optional.fromNullable(response);
}
booking调用car业务代码时@Compensable注解
booking调用car业务代码.png- TransactionAspect
@Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
// just check if we need to setup the transaction context information first
TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
if (transactionContext != null) {
populateOmegaContext(context, transactionContext);
}
// SCB-1011 Need to check if the globalTxId transaction is null to avoid the message sending failure
if (context.globalTxId() == null) {
throw new OmegaException("Cannot find the globalTxId from OmegaContext. Please using @SagaStart to start a global transaction.");
}
String localTxId = context.localTxId();
context.newLocalTxId();
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
int forwardRetries = compensable.forwardRetries();
// 可以有前向后向补偿,不过一般用default
RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(forwardRetries);
try {
return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, forwardRetries);
} finally {
context.setLocalTxId(localTxId);
LOG.debug("Restored context back to {}", context);
}
}
- 其他代码类似sagastart也是需要上传给alpha, 这里了注意远程调用carbooking接口时会rpc filter拦截,header里面填充请求头加上全局事务id信息。全局事务id比如雪花算法snowflake可以生成。
alpha集群开启时
- 集群高可用保证,依赖于akka sharding 分片
-
参考文章14 15对akka描述更详细
alpha集群开启时.png
基于servicecomb的saga思考
- 高可用: 集群部署,akka fsm状态机高可用由akk保证
- 高性能: grpc通信
- 高可靠: 主要是服务于客户端的高可靠
一: 调用方a调用服务器b,此时如果b服务器宕机导致没有给事务协调器alpha发end事件如果处理?或者调用方宕机怎么办?
1. 依靠补偿接口有处理无效补偿事件的能力。执行分支事务与RPC发送结束事件本来就无法保证原子性。所以这里直接进行补偿接口调用,需要补偿接口写好。
2. 分布式事务发起方宕机没结束事件没关系,alpha扫描发现事务没结束事件可以直接考虑进行补偿了。反正本来响应也是异常返回给用户。异常事件时会等30秒左右再调用各个补偿接口防止有子事务为完成。
3. 服务发起方到事务协调器这块的网络有可能出现异常, 重试不行就抛异常,分布式事务还是使调用方耦合了事务协调器
二: 可配置化SagaStartAspect.sender(SagaMessageSender是接口有不同实现)。OmegaSpringConfig -> TransactionAspectConfig -> 此构造方法,springboot回去OmegaSpringConfig找实现SagaMessageSender的bean最终是sagaLoadBalanceSender,这样每个项目可根据需求实现不同策略
@Aspect
@Order(value = 100)
public class SagaStartAspect {
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
private final OmegaContext context;
public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
this.context = context;
this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
}
}
@Bean
SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext") LoadBalanceContext loadBalanceSenderContext) {
final SagaMessageSender sagaMessageSender = new SagaLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
sagaMessageSender.onConnected();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
sagaMessageSender.onDisconnected();
sagaMessageSender.close();
}
}));
return sagaMessageSender;
}
@Bean
SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext context) {
return new SagaStartAspect(sender, context);
}
网友评论