美文网首页Java程序员互联网科技
盘点 Seata : Server 端事务的 Session 如

盘点 Seata : Server 端事务的 Session 如

作者: 熬夜不加班 | 来源:发表于2022-04-21 15:02 被阅读0次

    一 .前言

    之前提及过 Seata Client 的请求流程 , 这一篇从 Session 的处理来看一下 Seata Server 端的处理 .

    每一次 Seata 的全局操作都会创建一个 Session , 并且往表中插入事务数据.

    二 . global_table 表

    先来看一下 global_table 的表结构

    CREATE TABLE `global_table` (
      `xid` varchar(128) NOT NULL,
      `transaction_id` bigint(20) DEFAULT NULL,
      `status` tinyint(4) NOT NULL,
      `application_id` varchar(32) DEFAULT NULL,
      `transaction_service_group` varchar(32) DEFAULT NULL,
      `transaction_name` varchar(128) DEFAULT NULL,
      `timeout` int(11) DEFAULT NULL,
      `begin_time` bigint(20) DEFAULT NULL,
      `application_data` varchar(2000) DEFAULT NULL,
      `gmt_create` datetime DEFAULT NULL,
      `gmt_modified` datetime DEFAULT NULL,
      PRIMARY KEY (`xid`),
      KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
      KEY `idx_transaction_id` (`transaction_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    CREATE TABLE `branch_table` (
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(128) NOT NULL,
      `transaction_id` bigint(20) DEFAULT NULL,
      `resource_group_id` varchar(32) DEFAULT NULL,
      `resource_id` varchar(256) DEFAULT NULL,
      `branch_type` varchar(8) DEFAULT NULL,
      `status` tinyint(4) DEFAULT NULL,
      `client_id` varchar(64) DEFAULT NULL,
      `application_data` varchar(2000) DEFAULT NULL,
      `gmt_create` datetime(6) DEFAULT NULL,
      `gmt_modified` datetime(6) DEFAULT NULL,
      PRIMARY KEY (`branch_id`),
      KEY `idx_xid` (`xid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    三 . Server Session 处理一览

    我们通过启动参数 -m 对请求 STORE_MODE 进行配置 : seata-server.bat -m db

    整个 Session 的处理会分别对2个操作进行处理 , 一个为 global_table , 一个为 branch_table , 依次来说 :

    Pro 1 : global_table 的作用
    global_table 用于持久化全局事务 , 可以通过 store.db.global.table 进行配置

    Pro 2 : branch_table 的作用
    branch_table 用于标识分支事务 , 可以通过 store.db.branch.table 进行配置

    数据结构

    # C- LogStoreDataBaseDAO # insertGlobalTransactionDO : 插入 global_table   
    INSERT INTO `seata`.`global_table` 
        ( `xid`, 
        `transaction_id`, 
        `status`, 
        `application_id`, 
        `transaction_service_group`, 
        `transaction_name`, 
        `timeout`, 
        `begin_time`, 
        `application_data`, 
        `gmt_create`, 
        `gmt_modified` )
    VALUES
        ( '192.168.181.2:8091:8466916507467911205',
        8466916507467911205,
        1, 
        'business-seata-example', 
        'business-service-seata-service-group', 
        'dubbo-gts-seata-example', 
        300000, 
        1624863673423, 
        NULL, 
        '2021-06-28 15:01:28', 
        '2021-06-28 15:01:28' );
    
    # C- LogStoreDataBaseDAO # insertBranchTransactionDO  
    INSERT INTO `seata`.`branch_table`
        (`branch_id`, 
        `xid`, 
        `transaction_id`, 
        `resource_group_id`,
        `resource_id`, 
        `branch_type`, 
        `status`, 
        `client_id`, 
        `application_data`,
        `gmt_create`, 
        `gmt_modified`) 
    VALUES
        (8466916507467911829, 
        '192.168.181.2:8091:8466916507467911205', 
        8466916507467911205, 
        NULL, 
        'jdbc:mysql://127.0.0.1:3306/seata', 
        'AT', 
        0, 
        'storage-seata-example:192.168.181.2:51964', 
        NULL, 
        '2021-06-28 15:35:18.534107', 
        '2021-06-28 15:35:18.534107');
    

    3.1 global_table 的处理流程

    配置 STORE_MODE 为 db 后 , 会使用 DataBaseSessionManager 和 DataBaseTransactionStoreManager 进行业务的处理

    // 创建的调用入口 (此处忽略前置逻辑 , 但从 Session 的创建开始)
    C- AbstractSessionManager # onBegin
    C- DataBaseSessionManager # addGlobalSession
    C- DataBaseTransactionStoreManager # writeSession (此处类型为 GLOBAL_ADD((byte)1))
    

    从 Step 1 中可以看到 , 添加时会调用 writeSession , 这是个很重要的方法 , 基本上所有的编辑session 操作都会经历该类 , 可以通过 Debug 该部分

    /**
    * DataBaseTransactionStoreManager 该方法中进行了全局事务和分支事务的管理
    **/
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
            // 插入全局事务
            return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
            // 更新全局事务
            return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
            // 删除全局事务
            return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
            // 插入分支事务
            return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
            // 更新分支事务
            return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
            // 删除分支事务
            return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else {
                throw new StoreException("Unknown LogOperation:" + logOperation.name());
        }
    }
    

    [Pro31001] : logOperation 的作用和来源

    LogOperation 作用 :

    LogOperation 是一个枚举类 , 用于表示操作的类型

    enum LogOperation {
        GLOBAL_ADD((byte)1),
        GLOBAL_UPDATE((byte)2),
        GLOBAL_REMOVE((byte)3),
    
        BRANCH_ADD((byte)4),
        BRANCH_UPDATE((byte)5),
        BRANCH_REMOVE((byte)6);
    
        private byte code;
    }
    

    LogOperation 的来源:

    在调用该流程的时候 , 会传入对应的 LogOperation.code . 例如 DataBaseSessionManager 操作中

    
    C- DataBaseSessionManager
        M- addGlobalSession
            - transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
        M- updateGlobalSessionStatus
            - transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
        M- removeGlobalSession
            - transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session)
    

    3.2 branch_table 的处理逻辑

    //======== 以下是 Beanch 逻辑    
    C- DataBaseTransactionStoreManager # writeSession (此处类型为 BRANCH_ADD((byte)4))
    
    // 最终调用有以下方法
    C- LogStoreDataBaseDAO 
        M- insertBranchTransactionDO  
        M- updateBranchTransactionDO
        M- deleteBranchTransactionDO
    

    四 . Session 的初始化流程

    4.1 Session 的初始化

    Step 1 : 启动入口 Server # main , 其中会开启 Session

    // 在 server # main 启动方法中 , 会调用以下语句
    SessionHolder.init(parameterParser.getStoreMode());
    

    Step 2 : SessionHolder init 流程

    C- SessionHolder # init 中进行了如下操作 :

    • 获得配置的 store.mode , 从下面的代码可以看到支持 DB , FILE , REDIS
    • 通过 EnhancedServiceLoader#load 加载 SessionManager
    public static void init(String mode) {
    
        // 获取配置文件中的 store.mode 属性
        if (StringUtils.isBlank(mode)) {
            mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
        }
    
        // 构建 StoreMode
        StoreMode storeMode = StoreMode.get(mode);
        if (StoreMode.DB.equals(storeMode)) {
                // 基础会话管理器
                ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
                // 异步会话管理器
                ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                    new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
                // 重试提交会话管理器
                RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                    new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
               // 重试回退会话管理器 
                RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                    new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
        } else if (StoreMode.FILE.equals(storeMode)) {
              //..... 省略
        } else if (StoreMode.REDIS.equals(storeMode)) {
              //..... 省略
        } else {
            // unknown store
            throw new IllegalArgumentException("unknown store mode:" + mode);
        }
        // 详见最后 , 刷新操作
        reload(storeMode);
    }
    

    Step 3 : InnerEnhancedServiceLoader 加载的方式

    public static <S> S load(Class<S> service, String activateName) throws EnhancedServiceNotFoundException {
        // SPI 核心 : 这里就是一个简单的下层调用 , 这里简单说一下 InnerEnhancedServiceLoader
        return InnerEnhancedServiceLoader.getServiceLoader(service).load(activateName, findClassLoader());
    }
    
    // getServiceLoader 获取 ServiceLoader
    private static <S> InnerEnhancedServiceLoader<S> getServiceLoader(Class<S> type) {
    // 主要就是通过 SERVICE_LOADERS 获取整个集合 , 另外可以看到 , 这里是每次调用时为空就会先创建 , 再缓存一遍
    return (InnerEnhancedServiceLoader<S>)CollectionUtils.computeIfAbsent(SERVICE_LOADERS, type,
        key -> new InnerEnhancedServiceLoader<>(type));
    }
    

    [PRO:] InnerEnhancedServiceLoader 的作用 ?

    InnerEnhancedServiceLoader 是 EnhancedServiceLoader 的内部类 :
    
    // Pro : EnhancedServiceLoader 作用 
    EnhancedServiceLoader 是 Seata SPI 实现核心类 , Seata 通过 SPI 机制来实现 seata 的扩展 , 使其可以兼容多种注册中心 :
    
    EnhancedServiceLoader 中 load 有如下的方式加载一个服务 : 
        M- load(Class<S> service, ClassLoader loader) : 通过服务类型和加载器加载
        M- load(Class<S> service) : 通过服务类型加载
        M- load(Class<S> service, String activateName) : 通过服务类型和激活名 (加载 ExtensionDefinition 会使用该名称)
        M- load(Class<S> service, String activateName, ClassLoader loader)
        M- load(Class<S> service, String activateName, Object[] args) : 带属性参数 (Instance 创建实例时会使用该参数)
        // load 同时提供载入一组服务
        M- loadAll(Class<S> service)
        M- loadAll(Class<S> service, Class[] argsType, Object[] args)
    
    // Pro :  SPI Server 存放的位置
    Seata 的  Service 类和 Spring factories 基本上一直 , 也是放在 META-INF.service 中 , 其中提供了如下配置  -> PIC30001 
    
    // Pro :  EnhancedServiceLoader 的 子类
    EnhancedServiceLoader 中有一个内部类 : C- InnerEnhancedServiceLoader  , 主要作用为避免多次载入时出现不必要的载入
    
    InnerEnhancedServiceLoader 中提供了如下的参数 : 
    // class 对应的 InnerEnhancedServiceLoader 集合
    ConcurrentMap<Class<?>, InnerEnhancedServiceLoader<?>> SERVICE_LOADERS = new ConcurrentHashMap<>();
    // Holder 内部有一个 volatile 参数用于保存对象, 保证多线程可见
    Holder<List<ExtensionDefinition>> definitionsHolder = new Holder<>();
    // ExtensionDefinition 集合
    ConcurrentMap<ExtensionDefinition, Holder<Object>> definitionToInstanceMap = new ConcurrentHashMap<>();
    // name 对应的 ExtensionDefinition 集合
    ConcurrentMap<String, List<ExtensionDefinition>> nameToDefinitionsMap = new ConcurrentHashMap<>();
    // ExtensionDefinition class类型 对应的 ExtensionDefinition
    ConcurrentMap<Class<?>, ExtensionDefinition> classToDefinitionMap = new ConcurrentHashMap<>();      
    

    PIC30001 : META-INF.service 数据

    image.png

    Step 4 : 指定classLoader来加载 server provider

    该单元是主要的处理流程 , 用于

    C- EnhancedServiceLoader
    private S loadExtension(String activateName, ClassLoader loader, Class[] argTypes,
                                    Object[] args) {
    
        // activateName 判空操作 , 为空抛出异常 IllegalArgumentException        
        try {
            // 1 . 从配置文件 (META-INF 中加载所有的 Extension 对象)
            loadAllExtensionClass(loader);
            // 2 . 通过激活名获得 ExtensionDefinition 类数据
            ExtensionDefinition cachedExtensionDefinition = getCachedExtensionDefinition(activateName);
            // 3 . 获得实例
            return getExtensionInstance(cachedExtensionDefinition, loader, argTypes, args);
        } catch (Throwable e) {
            // .... 异常处理省略
        }
    }
    

    Step 5 : loadAllExtensionClass 从配置文件中获取所有的 Extension

    C- EnhancedServiceLoader
    // 1\. 判断和发起加载
    private List<Class> loadAllExtensionClass(ClassLoader loader) {
        List<ExtensionDefinition> definitions = definitionsHolder.get();
        if (definitions == null) {
            synchronized (definitionsHolder) {
                definitions = definitionsHolder.get();
                if (definitions == null) {
                      // 加锁后查询所有的 ExtensionDefinition , 避免线程冲突
                    definitions = findAllExtensionDefinition(loader);
                    definitionsHolder.set(definitions);
                }
            }
        }
        return definitions.stream().map(def -> def.getServiceClass()).collect(Collectors.toList());
    }
    
    // 2\. 加载流程
    private List<ExtensionDefinition> findAllExtensionDefinition(ClassLoader loader) {
    
        // 从 META-INF.service 和  META-INF.seata 中获取配置
        List<ExtensionDefinition> extensionDefinitions = new ArrayList<>();
        try {
            loadFile(SERVICES_DIRECTORY, loader, extensionDefinitions);
            loadFile(SEATA_DIRECTORY, loader, extensionDefinitions);
        } catch (IOException e) {
            throw new EnhancedServiceNotFoundException(e);
        }
    
        // 加载所有扩展后,按顺序对缓存进行排序 -> nameToDefinitionsMap
        if (!nameToDefinitionsMap.isEmpty()) {
            for (List<ExtensionDefinition> definitions : nameToDefinitionsMap.values()) {
                Collections.sort(definitions, (def1, def2) -> {
                    int o1 = def1.getOrder();
                    int o2 = def2.getOrder();
                    return Integer.compare(o1, o2);
                });
            }
        }
    
        // 对加载的 extensionDefinitions 进行排序
        if (!extensionDefinitions.isEmpty()) {
            Collections.sort(extensionDefinitions, (definition1, definition2) -> {
                int o1 = definition1.getOrder();
                int o2 = definition2.getOrder();
                return Integer.compare(o1, o2);
            });
        }
    
        return extensionDefinitions;
    }
    

    Step 6 : getCachedExtensionDefinition 获取 BeanDefinition 基础信息

    // 比较简单 , 就是获取 ConcurrentMap<String, List<ExtensionDefinition>> nameToDefinitionsMap 
    private ExtensionDefinition getCachedExtensionDefinition(String activateName) {
        List<ExtensionDefinition> definitions = nameToDefinitionsMap.get(activateName);
        return CollectionUtils.getLast(definitions);
    }
    

    Step 7 : 反射进行初始化

    // 发起流程 :
    loadExtension -> getExtensionInstance -> createNewExtension
    
    // getExtensionInstance 逻辑比较简单 , 就是判断是否为单例从而进行了一个单例模式的创建
    
    // createNewExtension 创建实例
    private S createNewExtension(ExtensionDefinition definition, ClassLoader loader, Class[] argTypes, Object[] args) {
        Class<?> clazz = definition.getServiceClass();
        try {
            S newInstance = initInstance(clazz, argTypes, args);
            return newInstance;
        } catch (Throwable t) {
            throw new IllegalStateException(....);
        }
    }
    
    // initInstance 初始化实例
    private S initInstance(Class implClazz, Class[] argTypes, Object[] args)
            throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
        S s = null;
        if (argTypes != null && args != null) {
            // 获取 构造函数的参数
            Constructor<S> constructor = implClazz.getDeclaredConstructor(argTypes);
            // 如果有参数 ,通过参数创建实例
            s = type.cast(constructor.newInstance(args));
        } else {
            // 使用默认构造器创建 (无参数的情况)
            s = type.cast(implClazz.newInstance());
        }
        if (s instanceof Initialize) {
            // 核心 7-1 实例init初始化
            ((Initialize)s).init();
        }
        return s;
    }
    

    Step 7-1 : DataBaseSessionManager 中 init 操作

    其中关于 DataBase 会使用 DataBaseSessionManager 操作 , 这一块看一下整体的体系 :

    image.png
    public void init() {
        // 初始化 DataBaseTransactionStoreManager
        transactionStoreManager = DataBaseTransactionStoreManager.getInstance();
    }
    
    // PS : Initialize 的实现类都需要实现 init 方法
    public interface Initialize {
        void init();
    }
    

    Step 7-2 : 构造器操作

    
    C- DataBaseTransactionStoreManager
        P- int DEFAULT_LOG_QUERY_LIMIT = 100;
    
    private DataBaseTransactionStoreManager() {
            logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
            // 获取 Datasource 类型
            String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
            // 初始化 dataSource
            DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
            // 构建 LogStoreDataBaseDAO
            logStore = new LogStoreDataBaseDAO(logStoreDataSource);
    }
    
    // [Pro] : ConfigurationKeys 的参数
    String STORE_DB_LOG_QUERY_LIMIT = STORE_DB_PREFIX + "queryLimit";
    
    // [Pro] : DataSourceProvider 的实现类
    io.seata.server.store.DbcpDataSourceProvider
    io.seata.server.store.DruidDataSourceProvider
    io.seata.server.store.HikariDataSourceProvider
    
    // 此处构建了一个 DataSource
    @LoadLevel(name = "hikari")
    public class HikariDataSourceProvider extends AbstractDataSourceProvider {
    
        @Override
        public DataSource generate() {
            Properties properties = new Properties();
            properties.setProperty("dataSource.cachePrepStmts", "true");
            properties.setProperty("dataSource.prepStmtCacheSize", "250");
            properties.setProperty("dataSource.prepStmtCacheSqlLimit", "2048");
            properties.setProperty("dataSource.useServerPrepStmts", "true");
            properties.setProperty("dataSource.useLocalSessionState", "true");
            properties.setProperty("dataSource.rewriteBatchedStatements", "true");
            properties.setProperty("dataSource.cacheResultSetMetadata", "true");
            properties.setProperty("dataSource.cacheServerConfiguration", "true");
            properties.setProperty("dataSource.elideSetAutoCommits", "true");
            properties.setProperty("dataSource.maintainTimeStats", "false");
    
            HikariConfig config = new HikariConfig(properties);
            config.setDriverClassName(getDriverClassName());
            config.setJdbcUrl(getUrl());
            config.setUsername(getUser());
            config.setPassword(getPassword());
            config.setMaximumPoolSize(getMaxConn());
            config.setMinimumIdle(getMinConn());
            config.setAutoCommit(true);
            config.setConnectionTimeout(getMaxWait());
            config.setInitializationFailTimeout(-1);
            return new HikariDataSource(config);
        }
    }
    

    Step 8 : reload 刷新 SessionManager

    在执行完上述逻辑后还没完全 , 注意 Step 2 中最后还有个 Reload 操作 ,该操作会继续处理 DataBaseSessionManager

    c- SessionHolder 
    
    // 这里会议一下之前的属性
    private static SessionManager ROOT_SESSION_MANAGER;
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
    
    protected static void reload(StoreMode storeMode) {
            if (ROOT_SESSION_MANAGER instanceof Reloadable) {
                ((Reloadable) ROOT_SESSION_MANAGER).reload();
            }
    
            // 
            Collection<GlobalSession> allSessions = ROOT_SESSION_MANAGER.allSessions();
            if (CollectionUtils.isNotEmpty(allSessions)) {
                List<GlobalSession> removeGlobalSessions = new ArrayList<>();
                Iterator<GlobalSession> iterator = allSessions.iterator();
                while (iterator.hasNext()) {
                    GlobalSession globalSession = iterator.next();
                    GlobalStatus globalStatus = globalSession.getStatus();
                    // 通过属性来判断处理的方式
                    switch (globalStatus) {
                        case UnKnown:
                        case Committed:
                        case CommitFailed:
                        case Rollbacked:
                        case RollbackFailed:
                        case TimeoutRollbacked:
                        case TimeoutRollbackFailed:
                        case Finished:
                            removeGlobalSessions.add(globalSession);
                            break;
                        case AsyncCommitting:
                            if (storeMode == StoreMode.FILE) {
                                queueToAsyncCommitting(globalSession);
                            }
                            break;
                        default: {
                            // TODO : 此处的原理在后面说 Lock 逻辑的时候统一说
                            if (storeMode == StoreMode.FILE) {
                                lockBranchSessions(globalSession.getSortedBranches());
                                // 如果上述都没有 , 需要先处理分支事务
                                switch (globalStatus) {
                                    case Committing:
                                    case CommitRetrying:
                                        queueToRetryCommit(globalSession);
                                        break;
                                    case Rollbacking:
                                    case RollbackRetrying:
                                    case TimeoutRollbacking:
                                    case TimeoutRollbackRetrying:
                                        queueToRetryRollback(globalSession);
                                        break;
                                    case Begin:
                                        globalSession.setActive(true);
                                        break;
                                    default:
                                        throw new ShouldNeverHappenException("NOT properly handled " + globalStatus);
                                }
                            }
                            break;
                        }
                    }
                }
                for (GlobalSession globalSession : removeGlobalSessions) {
                    removeInErrorState(globalSession);
                }
            }
    }
    
    C- 以 Database 为例 ,allSessions 又如下操作
    String ROOT_SESSION_MANAGER_NAME = "root.data";
    String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
    String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
    String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
    
    public Collection<GlobalSession> allSessions() {
            // get by taskName
        if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
            return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
        } else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
            return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committing}));
        } else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
            return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
                    GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
        } else {
            // all data
            return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
                    GlobalStatus.UnKnown, GlobalStatus.Begin,
                    GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
                    GlobalStatus.RollbackRetrying,
                    GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting}));
        }
    }
    

    [Pro] : Reloadable 对象体系

    public interface Reloadable {
        void reload();
    }
    
    // 这里的实现类主要为 FileSessionManager
    

    五 . 扩展知识点

    5.1 LoadLevel 的作用

    @LoadLevel(name = "file", scope = Scope.PROTOTYPE)

    
    LoadLevel 注解提供了三个参数 :
    
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE, ElementType.METHOD})
    public @interface LoadLevel {
    
        String name();
    
        // 在类似链式调用的过程中 , 可以对 Provider 进行排序
        int order() default 0;
    
        Scope scope() default Scope.SINGLETON;
    }
    
    // 作用域范围
    public enum Scope {
    
        SINGLETON,
        PROTOTYPE
    
    }
    

    总结

    LoadLevel 和 MATA-INF 真正的作用是用于扩展不同的数据库 , 后续等 seata 梳理完成后 , 再来看一下如何进行定制.

    自此 Session 的处理类初始化完成 , 后面来看一下 Session 在调用过程中的处理和数据库处理

    相关文章

      网友评论

        本文标题:盘点 Seata : Server 端事务的 Session 如

        本文链接:https://www.haomeiwen.com/subject/yfgiertx.html