美文网首页springcloudJavaJava 程序员
SpringCloud用Seata处理分布式事务

SpringCloud用Seata处理分布式事务

作者: 马小莫QAQ | 来源:发表于2022-06-07 16:12 被阅读0次

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

    • TC (Transaction Coordinator) - 事务协调者

    维护全局和分支事务的状态,驱动全局事务提交或回滚。

    • TM (Transaction Manager) - 事务管理器

    定义全局事务的范围:开始全局事务、提交或回滚全局事务。

    • RM (Resource Manager) - 资源管理器

    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

    Seata服务端部署(TC)

    为了简化过程,部署单机版。高可用部署参考

    • 1、部署nacos(跳过)

    [图片上传失败...(image-144e38-1654589365211)]

    • 2、添加nacos配置

    dataId: seataServer.properties,Group: SEATA_GROUP

    #Transport configuration, for client and server
    transport.type=TCP
    transport.server=NIO
    transport.heartbeat=true
    transport.enableTmClientBatchSendRequest=false
    transport.enableRmClientBatchSendRequest=true
    transport.enableTcServerBatchSendResponse=false
    transport.rpcRmRequestTimeout=30000
    transport.rpcTmRequestTimeout=30000
    transport.rpcTcRequestTimeout=30000
    transport.threadFactory.bossThreadPrefix=NettyBoss
    transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
    transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
    transport.threadFactory.shareBossWorker=false
    transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
    transport.threadFactory.clientSelectorThreadSize=1
    transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
    transport.threadFactory.bossThreadSize=1
    transport.threadFactory.workerThreadSize=default
    transport.shutdown.wait=3
    transport.serialization=seata
    transport.compressor=none
    
    #Transaction routing rules configuration, only for the client
    service.vgroupMapping.default_tx_group=dev
    #If you use a registry, you can ignore it
    service.default.grouplist=127.0.0.1:8091
    service.enableDegrade=false
    service.disableGlobalTransaction=false
    
    #Transaction rule configuration, only for the client
    client.rm.asyncCommitBufferLimit=10000
    client.rm.lock.retryInterval=10
    client.rm.lock.retryTimes=30
    client.rm.lock.retryPolicyBranchRollbackOnConflict=true
    client.rm.reportRetryCount=5
    client.rm.tableMetaCheckEnable=true
    client.rm.tableMetaCheckerInterval=60000
    client.rm.sqlParserType=druid
    client.rm.reportSuccessEnable=false
    client.rm.sagaBranchRegisterEnable=false
    client.rm.sagaJsonParser=fastjson
    client.rm.tccActionInterceptorOrder=-2147482648
    client.tm.commitRetryCount=5
    client.tm.rollbackRetryCount=5
    client.tm.defaultGlobalTransactionTimeout=60000
    client.tm.degradeCheck=false
    client.tm.degradeCheckAllowTimes=10
    client.tm.degradeCheckPeriod=2000
    client.tm.interceptorOrder=-2147482648
    client.undo.dataValidation=true
    client.undo.logSerialization=kryo
    client.undo.onlyCareUpdateColumns=true
    server.undo.logSaveDays=7
    server.undo.logDeletePeriod=86400000
    client.undo.logTable=undo_log
    client.undo.compress.enable=true
    client.undo.compress.type=zip
    client.undo.compress.threshold=64k
    #For TCC transaction mode
    tcc.fence.logTableName=tcc_fence_log
    tcc.fence.cleanPeriod=1h
    
    #Log rule configuration, for client and server
    log.exceptionRate=100
    
    #Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional.
    store.mode=db
    store.lock.mode=db
    store.session.mode=db
    #Used for password encryption
    store.publicKey=
    
    #These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
    store.db.datasource=druid
    store.db.dbType=mysql
    store.db.driverClassName=com.mysql.cj.jdbc.Driver
    store.db.url=jdbc:mysql://192.168.137.1:13306/seata?serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf8
    store.db.user=seata
    store.db.password=seata
    store.db.minConn=2
    store.db.maxConn=8
    store.db.globalTable=global_table
    store.db.branchTable=branch_table
    store.db.distributedLockTable=distributed_lock
    store.db.queryLimit=100
    store.db.lockTable=lock_table
    store.db.maxWait=5000
    
    #Transaction rule configuration, only for the server
    server.recovery.committingRetryPeriod=1000
    server.recovery.asynCommittingRetryPeriod=1000
    server.recovery.rollbackingRetryPeriod=1000
    server.recovery.timeoutRetryPeriod=1000
    server.maxCommitRetryTimeout=-1
    server.maxRollbackRetryTimeout=-1
    server.rollbackRetryTimeoutUnlockEnable=false
    server.distributedLockExpireTime=10000
    server.xaerNotaRetryTimeout=60000
    server.session.branchAsyncQueueSize=5000
    server.session.enableBranchAsyncRemove=false
    
    #Metrics configuration, only for the server
    metrics.enabled=false
    metrics.registryType=compact
    metrics.exporterList=prometheus
    metrics.exporterPrometheusPort=9898
    

    service.vgroupMapping.default_tx_group=dev(default_tx_group事务分组,dev是tc集群名称)
    client.undo.logSerialization=kryo(默认是jackson在1.4.2版本及以下版本会报错,并且json体积也会更大)

    LICENSE  bin  conf  lib  logs
    
    • 5、修改conf/registry.conf
    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      # nacos 作为注册中心
      type = "nacos"
    
      nacos {
        application = "seata-server"
        serverAddr = "192.168.137.1:8848"
        group = "SEATA_GROUP"
        namespace = "seata"
        cluster = "dev"
        username = "nacos"
        password = "nacos"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      # nacos 作为配置中心
      type = "nacos"
    
      nacos {
        serverAddr = "192.168.137.1:8848"
        namespace = "seata"
        group = "SEATA_GROUP"
        username = "nacos"
        password = "nacos"
        dataId = "seataServer.properties"
      }
    }
    

    cluster = "dev" 和上面service.vgroupMapping.default_tx_group=dev对应

    • 6、启动Seata
    # 事务日志使用db存储
    ./bin/seata-server.sh -m db
    14:16:22.600  INFO --- [                     main] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} inited
    14:16:22.987  INFO --- [                     main] i.s.core.rpc.netty.NettyServerBootstrap  : Server started, listen port: 8091
    
    参数 全写 作用 备注
    -h --host 指定在注册中心注册的 IP 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定
    -p --port 指定 server 启动的端口 默认为 8091
    -m --storeMode 事务日志存储方式 支持file,db,redis,默认为 file 注:redis需seata-server 1.3版本及以上
    -n --serverNode 用于指定seata-server节点ID 1,2,3..., 默认为 1
    -e --seataEnv 指定 seata-server 运行环境 dev, test 等, 服务启动时会使用 registry-dev.conf 这样的配置

    项目集成Seata

    • 1、 添加依赖
    implementation 'com.alibaba.cloud:spring-cloud-starter-alibaba-seata'
    implementation 'com.esotericsoftware:kryo'
    implementation 'com.esotericsoftware.kryo:kryo'
    implementation 'de.javakaffee:kryo-serializers'
    
    • 2、添加配置
    spring: 
      cloud:
        alibaba:
          seata:
            tx-service-group: default_tx_group
    seata:
      enabled: true
      registry:
        nacos:
          cluster: dev
          server-addr: 192.168.137.1:8848
          group: SEATA_GROUP
          username: nacos
          password: nacos
          namespace: seata
        type: nacos
      config:
        type: nacos
        nacos:
          server-addr: 192.168.137.1:8848
          group : SEATA_GROUP
          namespace: seata
          dataId: seataServer.properties
          username: nacos
          password: nacos
      service:
        vgroup-mapping:
          default_tx_group: dev
    

    tx-service-group: default_tx_group和service.vgroupMapping.default_tx_group要一致,且service.vgroup-mapping.default_tx_group: dev(为TC集群名称)

    • 3、编码

    TM事务发起方

    @GlobalTransactional(timeoutMills = 30000, name = "seataTest", rollbackFor = Exception.class)
    @Override
    public void seataTest() {
        final SysLog sysLog = new SysLog();
        sysLog.setId(IdUtils.id());
        sysLog.setType(SysLogTypeEnum.INFO.getType());
        sysLog.setClassName("SysLogServiceImpl");
        sysLog.setMethodName("seataTest");
        sysLog.setRequestUri("/seata");
        sysLog.setParams("");
        sysLog.setRequestIp("");
        save(sysLog);
        if (communityClient.test().isFail()) {
            throw ServiceException.of("client fail");
        }
    }
    

    只需要增加@GlobalTransactional注解

    @FeignClient(value = "sell", path = "/sellCommunity", fallbackFactory = SellCommunityClient.Fallback.class)
    public interface SellCommunityClient {
    
        /**
         * 测试分布式事务
         */
        @PostMapping("/test")
        Result<Void> test();
    
        @Component
        @Slf4j
        class Fallback implements FallbackFactory<SellCommunityClient> {
    
            @Override
            public SellCommunityClient create(Throwable cause) {
                log.error("异常原因:{}", cause.getMessage(), cause);
                return () -> {
                    log.info("------test Fallback------");
                    return Result.fail("Fallback");
                };
            }
    
        }
    }
    

    RM分支事务参与方

    @RestController
    @RequestMapping("/sellCommunity")
    @RequiredArgsConstructor
    @Getter
    public class SellCommunityController extends BaseController<SellCommunity, SellCommunityService> {
    
        private final SellCommunityService service;
    
        @PostMapping("/test")
        public Result<Void> test() {
            service.testSeata();
            return Result.success();
        }
    
    }
    
    @Service
    public class SellCommunityServiceImpl extends BaseServiceImpl<SellCommunityMapper, SellCommunity> implements SellCommunityService {
    
        @Override
        @Transactional(rollbackFor = Exception.class)
        public void testSeata() {
            final SellCommunity community = new SellCommunity();
            community.setName("test");
            community.setAddress("beijing");
            community.setHouseNumber(1000);
            insert(community);
    //        System.out.println(1/0);
        }
    }
    

    注意:
    1、项目有全局异常处理的,默认是不会回滚的,需要自己根据状态码处理
    2、使用fallbackFactory降级,默认是不会回滚的,同样需要在fallbackFactory修改状态码

    // 状态码判断
    public boolean isFail(){
        return this.code != ResultEnum.SUCCESS.getCode();
    }
    // 根据状态码抛出异常
    if (communityClient.test().isFail()) {
        throw ServiceException.of("client fail");
    }
    

    作者:realmadrid_juejin
    链接:https://juejin.cn/post/7097113972616200223
    来源:稀土掘金

    相关文章

      网友评论

        本文标题:SpringCloud用Seata处理分布式事务

        本文链接:https://www.haomeiwen.com/subject/xzyemrtx.html