本节介绍了PostgreSQL获取事务号XID的逻辑,主要解析了函数AssignTransactionId的实现逻辑。
一、数据结构
静态变量
当前事务状态CurrentTransactionState
/*
* CurrentTransactionState always points to the current transaction state
* block. It will point to TopTransactionStateData when not in a
* transaction at all, or when in a top-level transaction.
* CurrentTransactionState通常指向当前事务块.
* 如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData
*/
static TransactionStateData TopTransactionStateData = {
.state = TRANS_DEFAULT,
.blockState = TBLOCK_DEFAULT,
};
/*
* unreportedXids holds XIDs of all subtransactions that have not yet been
* reported in an XLOG_XACT_ASSIGNMENT record.
* unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务.
*/
static int nUnreportedXids;
static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
static TransactionState CurrentTransactionState = &TopTransactionStateData;
/*
* The subtransaction ID and command ID assignment counters are global
* to a whole transaction, so we do not keep them in the state stack.
* subtransaction ID和command ID全局计数器,对事务可见,在state栈中不记录这些信息.
*/
static SubTransactionId currentSubTransactionId;
static CommandId currentCommandId;
static bool currentCommandIdUsed;
TransactionState
事务状态结构体
/*
* transaction states - transaction state from server perspective
* 事务状态枚举 - 服务器视角的事务状态
*/
typedef enum TransState
{
TRANS_DEFAULT, /* idle 空闲 */
TRANS_START, /* transaction starting 事务启动 */
TRANS_INPROGRESS, /* inside a valid transaction 进行中 */
TRANS_COMMIT, /* commit in progress 提交中 */
TRANS_ABORT, /* abort in progress 回滚中 */
TRANS_PREPARE /* prepare in progress 准备中 */
} TransState;
/*
* transaction block states - transaction state of client queries
* 事务块状态 - 客户端查询的事务状态
*
* Note: the subtransaction states are used only for non-topmost
* transactions; the others appear only in the topmost transaction.
* 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
*/
typedef enum TBlockState
{
/* not-in-transaction-block states 未进入事务块状态 */
TBLOCK_DEFAULT, /* idle 空闲 */
TBLOCK_STARTED, /* running single-query transaction 单个查询事务 */
/* transaction block states 事务块状态 */
TBLOCK_BEGIN, /* starting transaction block 开始事务块 */
TBLOCK_INPROGRESS, /* live transaction 进行中 */
TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
TBLOCK_END, /* COMMIT received 接收到COMMIT */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_PREPARE, /* live xact, PREPARE received 进行中,接收到PREPARE */
/* subtransaction states 子事务状态 */
TBLOCK_SUBBEGIN, /* starting a subtransaction 开启 */
TBLOCK_SUBINPROGRESS, /* live subtransaction 进行中 */
TBLOCK_SUBRELEASE, /* RELEASE received 接收到RELEASE */
TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;
/*
* transaction state structure
* 事务状态结构体
*/
typedef struct TransactionStateData
{
//事务ID
TransactionId transactionId; /* my XID, or Invalid if none */
//子事务ID
SubTransactionId subTransactionId; /* my subxact ID */
//保存点名称
char *name; /* savepoint name, if any */
//保存点级别
int savepointLevel; /* savepoint level */
//低级别的事务状态
TransState state; /* low-level state */
//高级别的事务状态
TBlockState blockState; /* high-level state */
//事务嵌套深度
int nestingLevel; /* transaction nesting depth */
//GUC上下文嵌套深度
int gucNestLevel; /* GUC context nesting depth */
//事务生命周期上下文
MemoryContext curTransactionContext; /* my xact-lifetime context */
//查询资源
ResourceOwner curTransactionOwner; /* my query resources */
//按XID顺序保存的已提交的子事务ID
TransactionId *childXids; /* subcommitted child XIDs, in XID order */
//childXids数组大小
int nChildXids; /* # of subcommitted child XIDs */
//分配的childXids数组空间
int maxChildXids; /* allocated size of childXids[] */
//上一个CurrentUserId
Oid prevUser; /* previous CurrentUserId setting */
//上一个SecurityRestrictionContext
int prevSecContext; /* previous SecurityRestrictionContext */
//上一事务是否只读?
bool prevXactReadOnly; /* entry-time xact r/o state */
//是否处于Recovery?
bool startedInRecovery; /* did we start in recovery? */
//XID是否已保存在WAL Record中?
bool didLogXid; /* has xid been included in WAL record? */
//Enter/ExitParallelMode计数器
int parallelModeLevel; /* Enter/ExitParallelMode counter */
//父事务状态
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;
二、源码解读
AssignTransactionId函数,给定的TransactionState分配一个新的持久化事务号XID,在此函数调用前,不会为事务分配XIDs.
/*
* AssignTransactionId
*
* Assigns a new permanent XID to the given TransactionState.
* We do not assign XIDs to transactions until/unless this is called.
* Also, any parent TransactionStates that don't yet have XIDs are assigned
* one; this maintains the invariant that a child transaction has an XID
* following its parent's.
* 为给定的TransactionState分配一个新的持久化事务号XID.
* 在此函数调用前,我们不会为事务分配XIDs.
* 同时,所有尚未获得XIDs的父TransactionStates也会分配事务号,
* 这可以确保子事务的事务号在父事务之后.
*/
static void
AssignTransactionId(TransactionState s)
{
bool isSubXact = (s->parent != NULL);
ResourceOwner currentOwner;
bool log_unknown_top = false;
/* Assert that caller didn't screw up */
//确保调用者没有搞砸
Assert(!TransactionIdIsValid(s->transactionId));
Assert(s->state == TRANS_INPROGRESS);
/*
* Workers synchronize transaction state at the beginning of each parallel
* operation, so we can't account for new XIDs at this point.
* 在每个并行操作前,Parallel Workers同步事务状态,
* 因此我们不能在这时候请求XIDs
*/
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot assign XIDs during a parallel operation");
/*
* Ensure parent(s) have XIDs, so that a child always has an XID later
* than its parent. Mustn't recurse here, or we might get a stack overflow
* if we're at the bottom of a huge stack of subtransactions none of which
* have XIDs yet.
* 确保父事务已分配XIDs,这样可以确保子事务的XID后于父事务.
* 不能在这里递归执行,否则如果我们在一个未分配XID子事务大栈的底部,可能会遇到栈溢出
*/
if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
{
TransactionState p = s->parent;
TransactionState *parents;
size_t parentOffset = 0;
parents = palloc(sizeof(TransactionState) * s->nestingLevel);
while (p != NULL && !TransactionIdIsValid(p->transactionId))
{
parents[parentOffset++] = p;
p = p->parent;
}
/*
* This is technically a recursive call, but the recursion will never
* be more than one layer deep.
* 递归调用,但递归不会超过一层
*/
while (parentOffset != 0)
AssignTransactionId(parents[--parentOffset]);
pfree(parents);
}
/*
* When wal_level=logical, guarantee that a subtransaction's xid can only
* be seen in the WAL stream if its toplevel xid has been logged before.
* If necessary we log an xact_assignment record with fewer than
* PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
* for a transaction even though it appears in a WAL record, we just might
* superfluously log something. That can happen when an xid is included
* somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
* xl_standby_locks.
* 如wal_level=logical,确保子事务XID在顶层XID已被"日志"的情况只可以被WAL stream看到.
* 如果有必要,我们将使用小于PGPROC_MAX_CACHED_SUBXIDS的值来记录xact_assignment记录.
* 请注意,即使didLogXid出现在WAL记录中,如果它没有为事务设置,也没有问题,
* 我们可能只是多余地记录了一些东西。
* 当一个xid出现在WAL Record中的某个地方,但不在XLogRecord->xl_xid中
* (比如在xl_standby_locks中)时,就会发生这种情况。
*/
if (isSubXact && XLogLogicalInfoActive() &&
!TopTransactionStateData.didLogXid)
log_unknown_top = true;
/*
* Generate a new Xid and record it in PG_PROC and pg_subtrans.
* 生成一个新的XID,并记录在PG_PROC和pg_subtrans中
*
* NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
* shared storage other than PG_PROC; because if there's no room for it in
* PG_PROC, the subtrans entry is needed to ensure that other backends see
* the Xid as "running". See GetNewTransactionId.
* 注意:我们必须在Xid出现在除PG_PROC之外的共享存储之前构造subtrans条目.
* 因为如果在PG_PROC没有空闲空间,子事务条目需要确保其他进程看到该XID正在运行.
* 参考函数GetNewTransactionId说明.
*
*/
s->transactionId = GetNewTransactionId(isSubXact);
if (!isSubXact)
XactTopTransactionId = s->transactionId;
if (isSubXact)
SubTransSetParent(s->transactionId, s->parent->transactionId);
/*
* If it's a top-level transaction, the predicate locking system needs to
* be told about it too.
* 如为顶层事务,谓词锁系统也需要了解此事务.
*/
if (!isSubXact)
RegisterPredicateLockingXid(s->transactionId);
/*
* Acquire lock on the transaction XID. (We assume this cannot block.) We
* have to ensure that the lock is assigned to the transaction's own
* ResourceOwner.
* 请求锁(我们假定这样做不好引起阻塞).我们必须确保锁已被分配给事务自己的ResourceOwner.
*/
currentOwner = CurrentResourceOwner;
CurrentResourceOwner = s->curTransactionOwner;
XactLockTableInsert(s->transactionId);
CurrentResourceOwner = currentOwner;
/*
* Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
* top-level transaction we issue a WAL record for the assignment. We
* include the top-level xid and all the subxids that have not yet been
* reported using XLOG_XACT_ASSIGNMENT records.
* 在每个顶级事务中分配的每个PGPROC_MAX_CACHED_SUBXIDS事务id,
* 我们都会为分配记录一条WAL记录。
* 该记录包括顶级的xid和所有尚未使用XLOG_XACT_ASSIGNMENT记录报告的子xid。
*
* This is required to limit the amount of shared memory required in a hot
* standby server to keep track of in-progress XIDs. See notes for
* RecordKnownAssignedTransactionIds().
* 在跟踪进行中的XIDs的备机上,需要控制共享内存的大小.
* 参见RecordKnownAssignedTransactionIds()函数说明.
*
* We don't keep track of the immediate parent of each subxid, only the
* top-level transaction that each subxact belongs to. This is correct in
* recovery only because aborted subtransactions are separately WAL
* logged.
* 我们不需要跟踪父事务的每个子事务,只需要跟踪子事务归属的顶层事务即可.
* 这样可行是因为在恢复中,已回滚的子事务是通过WAL单独记录的.
*
* This is correct even for the case where several levels above us didn't
* have an xid assigned as we recursed up to them beforehand.
* 即使在我们之前递归到上面的几个级别时没有分配xid的情况下,这也是正确的。
*/
if (isSubXact && XLogStandbyInfoActive())
{
unreportedXids[nUnreportedXids] = s->transactionId;
nUnreportedXids++;
/*
* ensure this test matches similar one in
* RecoverPreparedTransactions()
* 确保在RecoverPreparedTransactions()中可以匹配到相似的.
*/
if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
log_unknown_top)
{
xl_xact_assignment xlrec;
/*
* xtop is always set by now because we recurse up transaction
* stack to the highest unassigned xid and then come back down
* xtop现在已经设置好了,因为我们将事务堆栈递归到最高的未分配的xid,然后再返回
*/
xlrec.xtop = GetTopTransactionId();
Assert(TransactionIdIsValid(xlrec.xtop));
xlrec.nsubxacts = nUnreportedXids;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment);
XLogRegisterData((char *) unreportedXids,
nUnreportedXids * sizeof(TransactionId));
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT);
nUnreportedXids = 0;
/* mark top, not current xact as having been logged */
//标记为最顶层,而不是当前已记录日志的xact
TopTransactionStateData.didLogXid = true;
}
}
}
三、跟踪分析
执行txid_current,触发函数调用
11:10:36 (xdb@[local]:5432)testdb=# begin;
BEGIN
11:40:20 (xdb@[local]:5432)testdb=#* select txid_current_if_assigned();
txid_current_if_assigned
--------------------------
(1 row)
11:40:43 (xdb@[local]:5432)testdb=#* select txid_current();
启动gdb,设置断点
(gdb) b AssignTransactionId
Breakpoint 5 at 0x546a4c: file xact.c, line 491.
(gdb) c
Continuing.
Breakpoint 5, AssignTransactionId (s=0xf9c720 <TopTransactionStateData>) at xact.c:491
491 bool isSubXact = (s->parent != NULL);
(gdb)
查看调用栈
(gdb) bt
#0 AssignTransactionId (s=0xf9c720 <TopTransactionStateData>) at xact.c:491
#1 0x000000000054693d in GetTopTransactionId () at xact.c:392
#2 0x00000000009fe1f3 in txid_current (fcinfo=0x25835a0) at txid.c:443
#3 0x00000000006cfebd in ExecInterpExpr (state=0x25834b8, econtext=0x25831a8, isnull=0x7ffe3d4a31f7)
at execExprInterp.c:654
#4 0x00000000006d1ac6 in ExecInterpExprStillValid (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)
at execExprInterp.c:1786
#5 0x00000000007140dd in ExecEvalExprSwitchContext (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)
at ../../../src/include/executor/executor.h:303
#6 0x000000000071414b in ExecProject (projInfo=0x25834b0) at ../../../src/include/executor/executor.h:337
#7 0x0000000000714323 in ExecResult (pstate=0x2583090) at nodeResult.c:136
#8 0x00000000006e4c30 in ExecProcNodeFirst (node=0x2583090) at execProcnode.c:445
#9 0x00000000006d9974 in ExecProcNode (node=0x2583090) at ../../../src/include/executor/executor.h:237
#10 0x00000000006dc22d in ExecutePlan (estate=0x2582e78, planstate=0x2583090, use_parallel_mode=false,
operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24cd0a0,
execute_once=true) at execMain.c:1723
#11 0x00000000006d9f5c in standard_ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0,
execute_once=true) at execMain.c:364
#12 0x00000000006d9d7f in ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0, execute_once=true)
at execMain.c:307
#13 0x00000000008ccf5a in PortalRunSelect (portal=0x250c748, forward=true, count=0, dest=0x24cd0a0) at pquery.c:932
#14 0x00000000008ccbf3 in PortalRun (portal=0x250c748, count=9223372036854775807, isTopLevel=true, run_once=true,
dest=0x24cd0a0, altdest=0x24cd0a0, completionTag=0x7ffe3d4a3570 "") at pquery.c:773
#15 0x00000000008c6b1e in exec_simple_query (query_string=0x24a6ec8 "select txid_current();") at postgres.c:1145
#16 0x00000000008cae70 in PostgresMain (argc=1, argv=0x24d2dc8, dbname=0x24d2c30 "testdb", username=0x24a3ba8 "xdb")
at postgres.c:4182
#17 0x000000000082642b in BackendRun (port=0x24c8c00) at postmaster.c:4361
---Type <return> to continue, or q <return> to quit---
#18 0x0000000000825b8f in BackendStartup (port=0x24c8c00) at postmaster.c:4033
#19 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#20 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x24a1b60) at postmaster.c:1379
#21 0x00000000007488ef in main (argc=1, argv=0x24a1b60) at main.c:228
输入参数TransactionState(全局变量,指向TopTransactionStateData)
(gdb) p s
$13 = (TransactionState) 0xf9c720 <TopTransactionStateData>
(gdb) p *s
$14 = {transactionId = 0, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS,
blockState = TBLOCK_INPROGRESS, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2523850,
curTransactionOwner = 0x24d4868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0,
prevXactReadOnly = false, startedInRecovery = false, didLogXid = false, parallelModeLevel = 0, parent = 0x0}
(gdb)
初始化部分变量并验证
(gdb) n
493 bool log_unknown_top = false;
(gdb)
496 Assert(!TransactionIdIsValid(s->transactionId));
(gdb)
497 Assert(s->state == TRANS_INPROGRESS);
(gdb)
获取事务号
(gdb)
503 if (IsInParallelMode() || IsParallelWorker())
(gdb)
512 if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
(gdb)
545 if (isSubXact && XLogLogicalInfoActive() &&
(gdb)
557 s->transactionId = GetNewTransactionId(isSubXact);
(gdb)
558 if (!isSubXact)
(gdb) p s->transactionId
$15 = 2407
(gdb)
注册,并设置其他信息
(gdb) n
559 XactTopTransactionId = s->transactionId;
(gdb)
561 if (isSubXact)
(gdb)
568 if (!isSubXact)
(gdb)
569 RegisterPredicateLockingXid(s->transactionId);
(gdb)
576 currentOwner = CurrentResourceOwner;
(gdb)
577 CurrentResourceOwner = s->curTransactionOwner;
(gdb)
579 XactLockTableInsert(s->transactionId);
(gdb)
581 CurrentResourceOwner = currentOwner;
(gdb)
601 if (isSubXact && XLogStandbyInfoActive())
(gdb)
635 }
完成调用
(gdb)
GetTopTransactionId () at xact.c:393
393 return XactTopTransactionId;
(gdb)
DONE!
网友评论