美文网首页PostgreSQL
PostgreSQL 源码解读(120)- MVCC#5(获取事

PostgreSQL 源码解读(120)- MVCC#5(获取事

作者: EthanHe | 来源:发表于2020-07-16 19:25 被阅读0次

    本节介绍了PostgreSQL获取事务号XID的逻辑,主要解析了函数AssignTransactionId的实现逻辑。

    一、数据结构

    静态变量
    当前事务状态CurrentTransactionState

    
    /*
     * CurrentTransactionState always points to the current transaction state
     * block.  It will point to TopTransactionStateData when not in a
     * transaction at all, or when in a top-level transaction.
     * CurrentTransactionState通常指向当前事务块.
     * 如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData
     */
    static TransactionStateData TopTransactionStateData = {
        .state = TRANS_DEFAULT,
        .blockState = TBLOCK_DEFAULT,
    };
    
    /*
     * unreportedXids holds XIDs of all subtransactions that have not yet been
     * reported in an XLOG_XACT_ASSIGNMENT record.
     * unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务.
     */
    static int  nUnreportedXids;
    static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
    
    static TransactionState CurrentTransactionState = &TopTransactionStateData;
    
    /*
     * The subtransaction ID and command ID assignment counters are global
     * to a whole transaction, so we do not keep them in the state stack.
     * subtransaction ID和command ID全局计数器,对事务可见,在state栈中不记录这些信息.
     */
    static SubTransactionId currentSubTransactionId;
    static CommandId currentCommandId;
    static bool currentCommandIdUsed;
     
    

    TransactionState
    事务状态结构体

    /*
     *  transaction states - transaction state from server perspective
     *  事务状态枚举 - 服务器视角的事务状态
     */
    typedef enum TransState
    {
        TRANS_DEFAULT,              /* idle 空闲 */
        TRANS_START,                /* transaction starting 事务启动 */
        TRANS_INPROGRESS,           /* inside a valid transaction 进行中 */
        TRANS_COMMIT,               /* commit in progress 提交中 */
        TRANS_ABORT,                /* abort in progress 回滚中 */
        TRANS_PREPARE               /* prepare in progress 准备中 */
    } TransState;
    
    /*
     *  transaction block states - transaction state of client queries
     *  事务块状态 - 客户端查询的事务状态
     *
     * Note: the subtransaction states are used only for non-topmost
     * transactions; the others appear only in the topmost transaction.
     * 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
     */
    typedef enum TBlockState
    {
        /* not-in-transaction-block states 未进入事务块状态 */
        TBLOCK_DEFAULT,             /* idle 空闲  */
        TBLOCK_STARTED,             /* running single-query transaction 单个查询事务 */
    
        /* transaction block states 事务块状态 */
        TBLOCK_BEGIN,               /* starting transaction block 开始事务块 */
        TBLOCK_INPROGRESS,          /* live transaction 进行中 */
        TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
        TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
        TBLOCK_END,                 /* COMMIT received 接收到COMMIT */
        TBLOCK_ABORT,               /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
        TBLOCK_ABORT_END,           /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
        TBLOCK_ABORT_PENDING,       /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
        TBLOCK_PREPARE,             /* live xact, PREPARE received 进行中,接收到PREPARE */
    
        /* subtransaction states 子事务状态 */
        TBLOCK_SUBBEGIN,            /* starting a subtransaction 开启 */
        TBLOCK_SUBINPROGRESS,       /* live subtransaction 进行中 */
        TBLOCK_SUBRELEASE,          /* RELEASE received 接收到RELEASE */
        TBLOCK_SUBCOMMIT,           /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
        TBLOCK_SUBABORT,            /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
        TBLOCK_SUBABORT_END,        /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
        TBLOCK_SUBABORT_PENDING,    /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
        TBLOCK_SUBRESTART,          /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
        TBLOCK_SUBABORT_RESTART     /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
    } TBlockState;
    
    /*
     *  transaction state structure
     *  事务状态结构体
     */
    typedef struct TransactionStateData
    {
        //事务ID
        TransactionId transactionId;    /* my XID, or Invalid if none */
        //子事务ID
        SubTransactionId subTransactionId;  /* my subxact ID */
        //保存点名称
        char       *name;           /* savepoint name, if any */
        //保存点级别
        int         savepointLevel; /* savepoint level */
        //低级别的事务状态
        TransState  state;          /* low-level state */
        //高级别的事务状态
        TBlockState blockState;     /* high-level state */
        //事务嵌套深度
        int         nestingLevel;   /* transaction nesting depth */
        //GUC上下文嵌套深度
        int         gucNestLevel;   /* GUC context nesting depth */
        //事务生命周期上下文
        MemoryContext curTransactionContext;    /* my xact-lifetime context */
        //查询资源
        ResourceOwner curTransactionOwner;  /* my query resources */
        //按XID顺序保存的已提交的子事务ID
        TransactionId *childXids;   /* subcommitted child XIDs, in XID order */
        //childXids数组大小
        int         nChildXids;     /* # of subcommitted child XIDs */
        //分配的childXids数组空间
        int         maxChildXids;   /* allocated size of childXids[] */
        //上一个CurrentUserId
        Oid         prevUser;       /* previous CurrentUserId setting */
        //上一个SecurityRestrictionContext
        int         prevSecContext; /* previous SecurityRestrictionContext */
        //上一事务是否只读?
        bool        prevXactReadOnly;   /* entry-time xact r/o state */
        //是否处于Recovery?
        bool        startedInRecovery;  /* did we start in recovery? */
        //XID是否已保存在WAL Record中?
        bool        didLogXid;      /* has xid been included in WAL record? */
        //Enter/ExitParallelMode计数器
        int         parallelModeLevel;  /* Enter/ExitParallelMode counter */
        //父事务状态
        struct TransactionStateData *parent;    /* back link to parent */
    } TransactionStateData;
    
    //结构体指针
    typedef TransactionStateData *TransactionState;
    
    

    二、源码解读

    AssignTransactionId函数,给定的TransactionState分配一个新的持久化事务号XID,在此函数调用前,不会为事务分配XIDs.

    /*
     * AssignTransactionId
     *
     * Assigns a new permanent XID to the given TransactionState.
     * We do not assign XIDs to transactions until/unless this is called.
     * Also, any parent TransactionStates that don't yet have XIDs are assigned
     * one; this maintains the invariant that a child transaction has an XID
     * following its parent's.
     * 为给定的TransactionState分配一个新的持久化事务号XID.
     * 在此函数调用前,我们不会为事务分配XIDs.
     * 同时,所有尚未获得XIDs的父TransactionStates也会分配事务号,
     *   这可以确保子事务的事务号在父事务之后.
     */
    static void
    AssignTransactionId(TransactionState s)
    {
        bool        isSubXact = (s->parent != NULL);
        ResourceOwner currentOwner;
        bool        log_unknown_top = false;
    
        /* Assert that caller didn't screw up */
        //确保调用者没有搞砸
        Assert(!TransactionIdIsValid(s->transactionId));
        Assert(s->state == TRANS_INPROGRESS);
    
        /*
         * Workers synchronize transaction state at the beginning of each parallel
         * operation, so we can't account for new XIDs at this point.
         * 在每个并行操作前,Parallel Workers同步事务状态,
         *   因此我们不能在这时候请求XIDs
         */
        if (IsInParallelMode() || IsParallelWorker())
            elog(ERROR, "cannot assign XIDs during a parallel operation");
    
        /*
         * Ensure parent(s) have XIDs, so that a child always has an XID later
         * than its parent.  Mustn't recurse here, or we might get a stack overflow
         * if we're at the bottom of a huge stack of subtransactions none of which
         * have XIDs yet.
         * 确保父事务已分配XIDs,这样可以确保子事务的XID后于父事务.
         * 不能在这里递归执行,否则如果我们在一个未分配XID子事务大栈的底部,可能会遇到栈溢出
         */
        if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
        {
            TransactionState p = s->parent;
            TransactionState *parents;
            size_t      parentOffset = 0;
    
            parents = palloc(sizeof(TransactionState) * s->nestingLevel);
            while (p != NULL && !TransactionIdIsValid(p->transactionId))
            {
                parents[parentOffset++] = p;
                p = p->parent;
            }
    
            /*
             * This is technically a recursive call, but the recursion will never
             * be more than one layer deep.
             * 递归调用,但递归不会超过一层
             */
            while (parentOffset != 0)
                AssignTransactionId(parents[--parentOffset]);
    
            pfree(parents);
        }
    
        /*
         * When wal_level=logical, guarantee that a subtransaction's xid can only
         * be seen in the WAL stream if its toplevel xid has been logged before.
         * If necessary we log an xact_assignment record with fewer than
         * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
         * for a transaction even though it appears in a WAL record, we just might
         * superfluously log something. That can happen when an xid is included
         * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
         * xl_standby_locks.
         * 如wal_level=logical,确保子事务XID在顶层XID已被"日志"的情况只可以被WAL stream看到.
         * 如果有必要,我们将使用小于PGPROC_MAX_CACHED_SUBXIDS的值来记录xact_assignment记录.
         * 请注意,即使didLogXid出现在WAL记录中,如果它没有为事务设置,也没有问题,
         *   我们可能只是多余地记录了一些东西。
         * 当一个xid出现在WAL Record中的某个地方,但不在XLogRecord->xl_xid中
         *   (比如在xl_standby_locks中)时,就会发生这种情况。
         */
        if (isSubXact && XLogLogicalInfoActive() &&
            !TopTransactionStateData.didLogXid)
            log_unknown_top = true;
    
        /*
         * Generate a new Xid and record it in PG_PROC and pg_subtrans.
         * 生成一个新的XID,并记录在PG_PROC和pg_subtrans中
         *
         * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
         * shared storage other than PG_PROC; because if there's no room for it in
         * PG_PROC, the subtrans entry is needed to ensure that other backends see
         * the Xid as "running".  See GetNewTransactionId.
         * 注意:我们必须在Xid出现在除PG_PROC之外的共享存储之前构造subtrans条目.
         * 因为如果在PG_PROC没有空闲空间,子事务条目需要确保其他进程看到该XID正在运行.
         * 参考函数GetNewTransactionId说明.
         * 
         */
        s->transactionId = GetNewTransactionId(isSubXact);
        if (!isSubXact)
            XactTopTransactionId = s->transactionId;
    
        if (isSubXact)
            SubTransSetParent(s->transactionId, s->parent->transactionId);
    
        /*
         * If it's a top-level transaction, the predicate locking system needs to
         * be told about it too.
         * 如为顶层事务,谓词锁系统也需要了解此事务.
         */
        if (!isSubXact)
            RegisterPredicateLockingXid(s->transactionId);
    
        /*
         * Acquire lock on the transaction XID.  (We assume this cannot block.) We
         * have to ensure that the lock is assigned to the transaction's own
         * ResourceOwner.
         * 请求锁(我们假定这样做不好引起阻塞).我们必须确保锁已被分配给事务自己的ResourceOwner.
         */
        currentOwner = CurrentResourceOwner;
        CurrentResourceOwner = s->curTransactionOwner;
    
        XactLockTableInsert(s->transactionId);
    
        CurrentResourceOwner = currentOwner;
    
        /*
         * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
         * top-level transaction we issue a WAL record for the assignment. We
         * include the top-level xid and all the subxids that have not yet been
         * reported using XLOG_XACT_ASSIGNMENT records.
         * 在每个顶级事务中分配的每个PGPROC_MAX_CACHED_SUBXIDS事务id,
         *   我们都会为分配记录一条WAL记录。
         * 该记录包括顶级的xid和所有尚未使用XLOG_XACT_ASSIGNMENT记录报告的子xid。
         *
         * This is required to limit the amount of shared memory required in a hot
         * standby server to keep track of in-progress XIDs. See notes for
         * RecordKnownAssignedTransactionIds().
         * 在跟踪进行中的XIDs的备机上,需要控制共享内存的大小.
         * 参见RecordKnownAssignedTransactionIds()函数说明.
         *
         * We don't keep track of the immediate parent of each subxid, only the
         * top-level transaction that each subxact belongs to. This is correct in
         * recovery only because aborted subtransactions are separately WAL
         * logged.
         * 我们不需要跟踪父事务的每个子事务,只需要跟踪子事务归属的顶层事务即可.
         * 这样可行是因为在恢复中,已回滚的子事务是通过WAL单独记录的.
         *
         * This is correct even for the case where several levels above us didn't
         * have an xid assigned as we recursed up to them beforehand.
         * 即使在我们之前递归到上面的几个级别时没有分配xid的情况下,这也是正确的。
         */
        if (isSubXact && XLogStandbyInfoActive())
        {
            unreportedXids[nUnreportedXids] = s->transactionId;
            nUnreportedXids++;
    
            /*
             * ensure this test matches similar one in
             * RecoverPreparedTransactions()
             * 确保在RecoverPreparedTransactions()中可以匹配到相似的.
             */
            if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
                log_unknown_top)
            {
                xl_xact_assignment xlrec;
    
                /*
                 * xtop is always set by now because we recurse up transaction
                 * stack to the highest unassigned xid and then come back down
                 * xtop现在已经设置好了,因为我们将事务堆栈递归到最高的未分配的xid,然后再返回
                 */
                xlrec.xtop = GetTopTransactionId();
                Assert(TransactionIdIsValid(xlrec.xtop));
                xlrec.nsubxacts = nUnreportedXids;
    
                XLogBeginInsert();
                XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment);
                XLogRegisterData((char *) unreportedXids,
                                 nUnreportedXids * sizeof(TransactionId));
    
                (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT);
    
                nUnreportedXids = 0;
                /* mark top, not current xact as having been logged */
                //标记为最顶层,而不是当前已记录日志的xact
                TopTransactionStateData.didLogXid = true;
            }
        }
    }
    
    

    三、跟踪分析

    执行txid_current,触发函数调用

    11:10:36 (xdb@[local]:5432)testdb=# begin;
    BEGIN
    11:40:20 (xdb@[local]:5432)testdb=#* select txid_current_if_assigned();
     txid_current_if_assigned 
    --------------------------
                             
    (1 row)
    
    11:40:43 (xdb@[local]:5432)testdb=#* select txid_current();
    
    

    启动gdb,设置断点

    (gdb) b AssignTransactionId
    Breakpoint 5 at 0x546a4c: file xact.c, line 491.
    (gdb) c
    Continuing.
    
    Breakpoint 5, AssignTransactionId (s=0xf9c720 <TopTransactionStateData>) at xact.c:491
    491     bool        isSubXact = (s->parent != NULL);
    (gdb) 
    

    查看调用栈

    (gdb) bt
    #0  AssignTransactionId (s=0xf9c720 <TopTransactionStateData>) at xact.c:491
    #1  0x000000000054693d in GetTopTransactionId () at xact.c:392
    #2  0x00000000009fe1f3 in txid_current (fcinfo=0x25835a0) at txid.c:443
    #3  0x00000000006cfebd in ExecInterpExpr (state=0x25834b8, econtext=0x25831a8, isnull=0x7ffe3d4a31f7)
        at execExprInterp.c:654
    #4  0x00000000006d1ac6 in ExecInterpExprStillValid (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)
        at execExprInterp.c:1786
    #5  0x00000000007140dd in ExecEvalExprSwitchContext (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)
        at ../../../src/include/executor/executor.h:303
    #6  0x000000000071414b in ExecProject (projInfo=0x25834b0) at ../../../src/include/executor/executor.h:337
    #7  0x0000000000714323 in ExecResult (pstate=0x2583090) at nodeResult.c:136
    #8  0x00000000006e4c30 in ExecProcNodeFirst (node=0x2583090) at execProcnode.c:445
    #9  0x00000000006d9974 in ExecProcNode (node=0x2583090) at ../../../src/include/executor/executor.h:237
    #10 0x00000000006dc22d in ExecutePlan (estate=0x2582e78, planstate=0x2583090, use_parallel_mode=false, 
        operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24cd0a0, 
        execute_once=true) at execMain.c:1723
    #11 0x00000000006d9f5c in standard_ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0, 
        execute_once=true) at execMain.c:364
    #12 0x00000000006d9d7f in ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0, execute_once=true)
        at execMain.c:307
    #13 0x00000000008ccf5a in PortalRunSelect (portal=0x250c748, forward=true, count=0, dest=0x24cd0a0) at pquery.c:932
    #14 0x00000000008ccbf3 in PortalRun (portal=0x250c748, count=9223372036854775807, isTopLevel=true, run_once=true, 
        dest=0x24cd0a0, altdest=0x24cd0a0, completionTag=0x7ffe3d4a3570 "") at pquery.c:773
    #15 0x00000000008c6b1e in exec_simple_query (query_string=0x24a6ec8 "select txid_current();") at postgres.c:1145
    #16 0x00000000008cae70 in PostgresMain (argc=1, argv=0x24d2dc8, dbname=0x24d2c30 "testdb", username=0x24a3ba8 "xdb")
        at postgres.c:4182
        #17 0x000000000082642b in BackendRun (port=0x24c8c00) at postmaster.c:4361
    ---Type <return> to continue, or q <return> to quit---
    #18 0x0000000000825b8f in BackendStartup (port=0x24c8c00) at postmaster.c:4033
    #19 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
    #20 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x24a1b60) at postmaster.c:1379
    #21 0x00000000007488ef in main (argc=1, argv=0x24a1b60) at main.c:228
    

    输入参数TransactionState(全局变量,指向TopTransactionStateData)

    (gdb) p s
    $13 = (TransactionState) 0xf9c720 <TopTransactionStateData>
    (gdb) p *s
    $14 = {transactionId = 0, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS, 
      blockState = TBLOCK_INPROGRESS, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2523850, 
      curTransactionOwner = 0x24d4868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, 
      prevXactReadOnly = false, startedInRecovery = false, didLogXid = false, parallelModeLevel = 0, parent = 0x0}
    (gdb) 
    

    初始化部分变量并验证

    (gdb) n
    493     bool        log_unknown_top = false;
    (gdb) 
    496     Assert(!TransactionIdIsValid(s->transactionId));
    (gdb) 
    497     Assert(s->state == TRANS_INPROGRESS);
    (gdb) 
    

    获取事务号

    (gdb) 
    503     if (IsInParallelMode() || IsParallelWorker())
    (gdb) 
    512     if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
    (gdb) 
    545     if (isSubXact && XLogLogicalInfoActive() &&
    (gdb) 
    557     s->transactionId = GetNewTransactionId(isSubXact);
    (gdb) 
    558     if (!isSubXact)
    (gdb) p s->transactionId
    $15 = 2407
    (gdb) 
    

    注册,并设置其他信息

    (gdb) n
    559         XactTopTransactionId = s->transactionId;
    (gdb) 
    561     if (isSubXact)
    (gdb) 
    568     if (!isSubXact)
    (gdb) 
    569         RegisterPredicateLockingXid(s->transactionId);
    (gdb) 
    576     currentOwner = CurrentResourceOwner;
    (gdb) 
    577     CurrentResourceOwner = s->curTransactionOwner;
    (gdb) 
    579     XactLockTableInsert(s->transactionId);
    (gdb) 
    581     CurrentResourceOwner = currentOwner;
    (gdb) 
    601     if (isSubXact && XLogStandbyInfoActive())
    (gdb) 
    635 }
    

    完成调用

    (gdb) 
    GetTopTransactionId () at xact.c:393
    393     return XactTopTransactionId;
    (gdb) 
    

    DONE!

    四、参考资料

    PG Source Code

    相关文章

      网友评论

        本文标题:PostgreSQL 源码解读(120)- MVCC#5(获取事

        本文链接:https://www.haomeiwen.com/subject/csymhktx.html