美文网首页分布式
seata实现分布式事务

seata实现分布式事务

作者: sunpy | 来源:发表于2022-08-26 22:27 被阅读0次

    seata是什么


    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

    seata官网:https://seata.io/zh-cn/docs/overview/what-is-seata.html

    docker部署seata服务器


    官网操作:https://seata.io/zh-cn/docs/ops/deploy-by-docker.html

    1. 下载镜像:
    docker pull seataio/seata-server:1.4.2
    
    1. 启动容器:
    docker run -d -e SEATA_IP=外网IP -e SEATA_PORT=8091 -p 8091:8091 --name seata-server seataio/seata-server:1.4.2
    

    注意一定要指定地址为外网地址,这样nacos就可以获取了,否则nacos默认获取内网地址,这样客户端连接不上。

    1. 进入容器:
    docker exec -it 容器id /bin/sh
    
    1. 修改配置文件:/seata-server/resources/registry.conf
    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      type = "nacos"
    
      nacos {
        application = "seata-server"
        serverAddr = "外网NACOS的IP:8848"
        group = "DEFAULT_GROUP"
        namespace = ""
        cluster = "default"
        username = "nacos"
        password = "nacos"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      type = "nacos"
    
      nacos {
        serverAddr = "外网NACOS的IP:8848"
        namespace = ""
        group = "DEFAULT_GROUP"
        username = "nacos"
        password = "nacos"
        dataId = "seataServer.properties"
      }
    }
    
    1. 修改配置文件:/seata-server/resources/file.conf
    ## transaction log store, only used in seata-server
    store {
      ## store mode: file、db、redis
      mode = "db"
      ## rsa decryption public key
      publicKey = ""
      ## file store property
      db {
        datasource = "druid"
        ## mysql/oracle/postgresql/h2/oceanbase etc.
        dbType = "mysql"
        driverClassName = "com.mysql.cj.jdbc.Driver"
        ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
        url = "jdbc:mysql://外网数据库IP:3389/seata?rewriteBatchedStatements=true"
        user = "root"
        password = "password"
        minConn = 5
        maxConn = 100
        globalTable = "global_table"
        branchTable = "branch_table"
        lockTable = "lock_table"
        queryLimit = 100
        maxWait = 5000
      } 
    }
    
    1. 建立seata数据库,导入配置sql文件:
    -- -------------------------------- The script used when storeMode is 'db' --------------------------------
    -- the table to store GlobalSession data
    CREATE TABLE IF NOT EXISTS `global_table`
    (
        `xid`                       VARCHAR(128) NOT NULL,
        `transaction_id`            BIGINT,
        `status`                    TINYINT      NOT NULL,
        `application_id`            VARCHAR(32),
        `transaction_service_group` VARCHAR(32),
        `transaction_name`          VARCHAR(128),
        `timeout`                   INT,
        `begin_time`                BIGINT,
        `application_data`          VARCHAR(2000),
        `gmt_create`                DATETIME,
        `gmt_modified`              DATETIME,
        PRIMARY KEY (`xid`),
        KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
        KEY `idx_transaction_id` (`transaction_id`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    -- the table to store BranchSession data
    CREATE TABLE IF NOT EXISTS `branch_table`
    (
        `branch_id`         BIGINT       NOT NULL,
        `xid`               VARCHAR(128) NOT NULL,
        `transaction_id`    BIGINT,
        `resource_group_id` VARCHAR(32),
        `resource_id`       VARCHAR(256),
        `branch_type`       VARCHAR(8),
        `status`            TINYINT,
        `client_id`         VARCHAR(64),
        `application_data`  VARCHAR(2000),
        `gmt_create`        DATETIME(6),
        `gmt_modified`      DATETIME(6),
        PRIMARY KEY (`branch_id`),
        KEY `idx_xid` (`xid`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    -- the table to store lock data
    CREATE TABLE IF NOT EXISTS `lock_table`
    (
        `row_key`        VARCHAR(128) NOT NULL,
        `xid`            VARCHAR(128),
        `transaction_id` BIGINT,
        `branch_id`      BIGINT       NOT NULL,
        `resource_id`    VARCHAR(256),
        `table_name`     VARCHAR(32),
        `pk`             VARCHAR(36),
        `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
        `gmt_create`     DATETIME,
        `gmt_modified`   DATETIME,
        PRIMARY KEY (`row_key`),
        KEY `idx_status` (`status`),
        KEY `idx_branch_id` (`branch_id`),
        KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    CREATE TABLE IF NOT EXISTS `distributed_lock`
    (
        `lock_key`       CHAR(20) NOT NULL,
        `lock_value`     VARCHAR(20) NOT NULL,
        `expire`         BIGINT,
        primary key (`lock_key`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
    
    1. 在nacos中配置seata-server:

    配置内容:

    service.vgroupMapping.zhishu_group=default
    service.enableDegrade=false
    service.disableGlobalTransaction=false
    
    store.mode=db
    store.db.datasource=druid
    store.db.dbType=mysql
    store.db.driverClassName=com.mysql.jdbc.Driver
    store.db.url=jdbc:mysql://外网IP:3389/seata?useUnicode=true&rewriteBatchedStatements=true
    store.db.user=root
    store.db.password=password
    store.db.minConn=5
    store.db.maxConn=30
    store.db.globalTable=global_table
    store.db.branchTable=branch_table
    store.db.distributedLockTable=distributed_lock
    store.db.queryLimit=100
    store.db.lockTable=lock_table
    store.db.maxWait=5000
    

    详细配置:https://github.com/seata/seata/blob/1.4.0/script/config-center/config.txt

    1. docker重启seata服务器:
    docker restart 容器id
    
    1. 查看seata-server服务是否加载到了nacos上:

    seata分布式事务中AT模式


    AT事务参照2PC算法。AT事务的思路就是快照思路。
    业务数据提交时,自动拦截所有的SQL,分别保存SQL对数据修改前后的快照。
    如果分布式事务成功,那么直接删除快照中的记录。
    如果分布式事务失败,那么事务回滚,根据日志数据自动产生用于补偿的逆向SQL。

    springboot整合seata


    应用seata实现分布式业务中的AT模式:
    导入jar包:

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>seata-spring-boot-starter</artifactId>
                <groupId>io.seata</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.4.2</version>
    </dependency>
    

    业务数据库中添加undo_log表。

    drop table `undo_log`;
    CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    

    application.yml配置:

    seata:
      enabled: true
      application-id: ${spring.application.name}
      enable-auto-data-source-proxy: true #是否开启数据源自动代理
      tx-service-group: admin-service-seata #需要和nacos上的service.vgroupMapping.admin-service-seata保持一致
      service:
        vgroup-mapping:
          admin-service-seata: default
        disable-global-transaction: false
    
      registry:
        type: nacos
        nacos:
          application: seata-server
          server-addr: IP:8848
          group: "DEFAULT_GROUP"
          namespace: ""
          username: "nacos"
          password: "nacos"
      config:
        type: nacos
        nacos:
          server-addr: IP:8848
          group: "DEFAULT_GROUP"
          namespace: ""
          username: "nacos"
          password: "nacos"
      data-source-proxy-mode: AT
    

    admin-service业务逻辑service实现:

    @GlobalTransactional(rollbackFor = CommonException.class)
    @Transactional(rollbackFor = CommonException.class)
    @Override
    public ResultModel<String> insertUser(@NonNull String id,
                                          @NonNull String username,
                                          @NonNull String password,
                                          @NonNull Integer roleId) throws CommonException {
        ResultModel<String> resultModel = new ResultModel<>();
        User user = new User();
        user.setId(id);
        user.setUsername(username);
        user.setPassword(password);
        user.setRoleId(roleId);
        userMapper.insert(user);
    
        ResultModel<String> teacherResult = teacherFeign.addTeacher(username, id);
    
        if (teacherResult.getSuccess()) {
            resultModel.setMsg("新增一条记录");
            resultModel.setTime(TimeUtil.getNowTime());
            resultModel.setRes(id);
            log.info("本地事务执行成功,新增一条记录 id = " + id);
    
            return resultModel;
        }
    
        resultModel.setSuccess(false);
        resultModel.setCode(500);
        resultModel.setMsg("新增一条记录失败");
        resultModel.setTime(TimeUtil.getNowTime());
        resultModel.setRes(id);
        return resultModel;
    }
    

    feign调用服务:

    @FeignClient(name="teacher-service")
    public interface TeacherFeign {
        @PostMapping("/teacher/add")
        ResultModel<String> addTeacher(@RequestParam("name") String name, @RequestHeader("userId") String userId) throws CommonException;
    }
    

    调用另一个系统teacher-service服务teacherFeign.addTeacher(username, id);

    @Transactional(rollbackFor = CommonException.class)
    @Override
    public ResultModel<String> addTeacherInfo(String teacherId, String name) throws CommonException {
        Teacher teacher = new Teacher();
        teacher.setId(teacherId);
        teacher.setTeacherName(name);
    
        throw new CommonException("教师表中该id已存在");
    }
    

    controller层:

    @PostMapping("/trans/add")
    public ResultModel<String> addUser(@RequestParam("username") String username,
                                        @RequestHeader("password") String password,
                                        @RequestHeader("roleId") Integer roleId) {
        UserBO userBO = new UserBO();
        userBO.setUsername(username);
        userBO.setPassword(password);
        userBO.setRoleId(roleId);
        String id = UUID.randomUUID().toString().replace("-","");
        return userService.insertUser(id, username, password, roleId);
    }
    

    由于teacher-service服务抛出异常,所以admin-service服务虽然在edu_user新增杨幂记录,但是由于抛出异常,回滚了,所以edu_teacher没有记录。

    相关文章

      网友评论

        本文标题:seata实现分布式事务

        本文链接:https://www.haomeiwen.com/subject/hntagrtx.html