美文网首页
分布式事务

分布式事务

作者: 后来丶_a24d | 来源:发表于2021-01-08 11:49 被阅读0次

    目录

    目录.png

    分布式事务解决方案

    • 长事务: saga
    • 短事务: 设计的时候尽量短事务,能不用分布式事务尽量不用,分布式事务影响性能,也要求业务端保证补偿接口幂等。比如booking + kafka保证分布式事务,可以用掉booking时插入mysql,如果正常结束给个正常status,如果rpc异常结束,后续可以线程扫描进行补偿以达到最终一致性。

    分布式事务应用场景

    刚性事务xa

    • 适用于需要强一致性的业务,并发性不高,需要数据库支持
    • seata的at是柔性事务,第二阶段可以异步,无业务代码入侵,基于sql生成逆向sql,第二阶段可异步

    柔性事务

    seata的at
    • Seata 的 AT 模式基于本地事务的特性,通过拦截并解析 SQL 的方式,记录自定义的回滚日志
    • at支持mysql tidb之类的但是redis es这种就不支持, 需要表有主键
    • 业务0入侵,如果只是sql的二阶段commit/rollback很适合
    最大努力送达
    • 实现有本地消息表之类的
    • 适合用户注册成功后发送邮件短信、电商系统给用户发送优惠券、不同系统数据同步等需要保证最终一致性的场景(同步加异步)
    tcc
    • 适合金融核心系统的特点是一致性要求高(业务上的隔离性)、短流程、并发高
    saga
    • 业务流程长、业务流程多;参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口;典型业务系统:如金融网路(与外部机构对接)、互联网微贷、渠道整合、分布式架构下服务集成等业务系统
    saga服务端或者saga客户端重启或者宕机
    1. saga服务端出问题:
      本地消息补偿
    2. saga客户端宕机或者重启
      根据记录状态的消息进行相应补偿,要求客户端调用的被调用方业务接口幂等,补偿业务和saga服务,然后saga服务再进行补偿
    3. 普通业务场景下客户端宕机或者重启
      本地消息补偿,事件开始时就记录,定时任务扫描看状态进行补偿,同样要求被调用方业务接口幂等

    基于seata分布式事务 vs servicecomb

    • seata的saga需要熟悉状态机那块的写法,servicecomb的saga可以用注解实现saga

    分布式事务 servicecomb的saga原理

    • 这里使用官方git的demo, booking -> car -> hotel


      booking -> car -> hotel.png
    最开始调用时先经过sagastart注解
    sagastart注解流程.png
    • SagaStartAspect
    @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
    Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
        initializeOmegaContext();
        if(context.getAlphaMetas().isAkkaEnabled() && sagaStart.timeout()>0){
          SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
          return wrapper.apply(joinPoint,sagaStart,context);
        }else{
          SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
          return wrapper.apply(joinPoint,sagaStart,context);
        }
    }
    
    private void initializeOmegaContext() {
        context.setLocalTxId(context.newGlobalTxId());
    }
    
    • SagaStartAnnotationProcessor
    public AlphaResponse preIntercept(int timeout) {
        try {
          return sender
              .send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
        } catch (OmegaException e) {
          throw new TransactionalException(e.getMessage(), e.getCause());
        }
    }
    
    SagaStartAnnotationProcessor远程调用alpha步骤1.2.3
    远程调用alpha步骤1.2.3.png
    远程调用alpha流程图.png
    • SagaLoadBalanceSender extends LoadBalanceSenderAdapter
    @Override
    public AlphaResponse send(TxEvent event) {
        do {
          final SagaMessageSender messageSender = pickMessageSender();
          Optional<AlphaResponse> response = doGrpcSend(messageSender, event, new SenderExecutor<TxEvent>() {
            @Override
            public AlphaResponse apply(TxEvent event) {
              return messageSender.send(event);
            }
          });
          if (response.isPresent()) return response.get();
        } while (!Thread.currentThread().isInterrupted());
    
        throw new OmegaException("Failed to send event " + event + " due to interruption");
    }
    
    LoadBalanceSenderAdapter
    // 轮询做负载
    public <T> T pickMessageSender() {
        return (T) senderPicker.pick(loadContext.getSenders(),
            loadContext.getGrpcOnErrorHandler().getGrpcRetryContext().getDefaultMessageSender());
    }
    // 统一模板方法,回调子类自己的方法
    public <T> Optional<AlphaResponse> doGrpcSend(MessageSender messageSender, T event, SenderExecutor<T> executor) {
        AlphaResponse response = null;
        try {
          long startTime = System.nanoTime();
          response = executor.apply(event);
          loadContext.getSenders().put(messageSender, System.nanoTime() - startTime);
        } catch (OmegaException e) {
          throw e;
        } catch (Exception e) {
          LOG.error("Retry sending event {} due to failure", event, e);
          loadContext.getSenders().put(messageSender, Long.MAX_VALUE);
        }
        return Optional.fromNullable(response);
    }
    
    booking调用car业务代码时@Compensable注解
    booking调用car业务代码.png
    • TransactionAspect
    @Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
    Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        // just check if we need to setup the transaction context information first
        TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
        if (transactionContext != null) {
          populateOmegaContext(context, transactionContext);
        }
        // SCB-1011 Need to check if the globalTxId transaction is null to avoid the message sending failure
        if (context.globalTxId() == null) {
          throw new OmegaException("Cannot find the globalTxId from OmegaContext. Please using @SagaStart to start a global transaction.");
        }
        String localTxId = context.localTxId();
        context.newLocalTxId();
        LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
      
        int forwardRetries = compensable.forwardRetries();
        // 可以有前向后向补偿,不过一般用default
        RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(forwardRetries);
        try {
          return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, forwardRetries);
        } finally {
          context.setLocalTxId(localTxId);
          LOG.debug("Restored context back to {}", context);
        }
    }
    
    • 其他代码类似sagastart也是需要上传给alpha, 这里了注意远程调用carbooking接口时会rpc filter拦截,header里面填充请求头加上全局事务id信息。全局事务id比如雪花算法snowflake可以生成。
    alpha集群开启时
    • 集群高可用保证,依赖于akka sharding 分片
    • 参考文章14 15对akka描述更详细


      alpha集群开启时.png

    基于servicecomb的saga思考

    • 高可用: 集群部署,akka fsm状态机高可用由akk保证
    • 高性能: grpc通信
    • 高可靠: 主要是服务于客户端的高可靠

    一: 调用方a调用服务器b,此时如果b服务器宕机导致没有给事务协调器alpha发end事件如果处理?或者调用方宕机怎么办?
    1. 依靠补偿接口有处理无效补偿事件的能力。执行分支事务与RPC发送结束事件本来就无法保证原子性。所以这里直接进行补偿接口调用,需要补偿接口写好。
    2. 分布式事务发起方宕机没结束事件没关系,alpha扫描发现事务没结束事件可以直接考虑进行补偿了。反正本来响应也是异常返回给用户。异常事件时会等30秒左右再调用各个补偿接口防止有子事务为完成。
    3. 服务发起方到事务协调器这块的网络有可能出现异常, 重试不行就抛异常,分布式事务还是使调用方耦合了事务协调器

    二: 可配置化SagaStartAspect.sender(SagaMessageSender是接口有不同实现)。OmegaSpringConfig -> TransactionAspectConfig -> 此构造方法,springboot回去OmegaSpringConfig找实现SagaMessageSender的bean最终是sagaLoadBalanceSender,这样每个项目可根据需求实现不同策略

    @Aspect
    @Order(value = 100)
    public class SagaStartAspect {
    
      private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
    
      private final OmegaContext context;
    
      public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
        this.context = context;
        this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
      }
    }
    
    @Bean
    SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext") LoadBalanceContext loadBalanceSenderContext) {
        final SagaMessageSender sagaMessageSender = new SagaLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
        sagaMessageSender.onConnected();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
          @Override
          public void run() {
            sagaMessageSender.onDisconnected();
            sagaMessageSender.close();
          }
        }));
        return sagaMessageSender;
    }
    
    @Bean
    SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext context) {
        return new SagaStartAspect(sender, context);
    }
    

    参考文章

    1. 分布式事务看这一篇就够了
    2. 分布式事务:2PC、3PC、SAGA、TCC
    3. Seata 分布式事务实践和开源详解 | GIAC 实录
    4. at与xa区别
    5. saga使用场景
    6. 可靠消息最终一致性(本地消息表)
    7. 云原生时代分布式事务
    8. 两天看完分布式事务
    9. 带你读透 SEATA 的 AT 模式
    10. 分布式事务 Seata AT模式原理与实战
    11. serviccomb-omega源码解读
    12. servicecomb-saga各个issue,优化
    13. Akka中文指南
    14. akka分片
    15. akka分片以及分片故障自愈
    16. servicecomb文档

    相关文章

      网友评论

          本文标题:分布式事务

          本文链接:https://www.haomeiwen.com/subject/pevjgktx.html