本节介绍了PostgreSQL启动事务的逻辑,主要内容是函数StartTransaction的实现逻辑。
一、数据结构
静态变量
当前事务状态CurrentTransactionState
/*
* CurrentTransactionState always points to the current transaction state
* block. It will point to TopTransactionStateData when not in a
* transaction at all, or when in a top-level transaction.
* CurrentTransactionState通常指向当前事务块.
* 如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData
*/
static TransactionStateData TopTransactionStateData = {
.state = TRANS_DEFAULT,
.blockState = TBLOCK_DEFAULT,
};
/*
* unreportedXids holds XIDs of all subtransactions that have not yet been
* reported in an XLOG_XACT_ASSIGNMENT record.
* unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务.
*/
static int nUnreportedXids;
static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
static TransactionState CurrentTransactionState = &TopTransactionStateData;
/*
* The subtransaction ID and command ID assignment counters are global
* to a whole transaction, so we do not keep them in the state stack.
* subtransaction ID和command ID全局计数器,对事务可见,在state栈中不记录这些信息.
*/
static SubTransactionId currentSubTransactionId;
static CommandId currentCommandId;
static bool currentCommandIdUsed;
TransactionState
事务状态结构体
/*
* transaction states - transaction state from server perspective
* 事务状态枚举 - 服务器视角的事务状态
*/
typedef enum TransState
{
TRANS_DEFAULT, /* idle 空闲 */
TRANS_START, /* transaction starting 事务启动 */
TRANS_INPROGRESS, /* inside a valid transaction 进行中 */
TRANS_COMMIT, /* commit in progress 提交中 */
TRANS_ABORT, /* abort in progress 回滚中 */
TRANS_PREPARE /* prepare in progress 准备中 */
} TransState;
/*
* transaction block states - transaction state of client queries
* 事务块状态 - 客户端查询的事务状态
*
* Note: the subtransaction states are used only for non-topmost
* transactions; the others appear only in the topmost transaction.
* 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
*/
typedef enum TBlockState
{
/* not-in-transaction-block states 未进入事务块状态 */
TBLOCK_DEFAULT, /* idle 空闲 */
TBLOCK_STARTED, /* running single-query transaction 单个查询事务 */
/* transaction block states 事务块状态 */
TBLOCK_BEGIN, /* starting transaction block 开始事务块 */
TBLOCK_INPROGRESS, /* live transaction 进行中 */
TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
TBLOCK_END, /* COMMIT received 接收到COMMIT */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_PREPARE, /* live xact, PREPARE received 进行中,接收到PREPARE */
/* subtransaction states 子事务状态 */
TBLOCK_SUBBEGIN, /* starting a subtransaction 开启 */
TBLOCK_SUBINPROGRESS, /* live subtransaction 进行中 */
TBLOCK_SUBRELEASE, /* RELEASE received 接收到RELEASE */
TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;
/*
* transaction state structure
* 事务状态结构体
*/
typedef struct TransactionStateData
{
//事务ID
TransactionId transactionId; /* my XID, or Invalid if none */
//子事务ID
SubTransactionId subTransactionId; /* my subxact ID */
//保存点名称
char *name; /* savepoint name, if any */
//保存点级别
int savepointLevel; /* savepoint level */
//低级别的事务状态
TransState state; /* low-level state */
//高级别的事务状态
TBlockState blockState; /* high-level state */
//事务嵌套深度
int nestingLevel; /* transaction nesting depth */
//GUC上下文嵌套深度
int gucNestLevel; /* GUC context nesting depth */
//事务生命周期上下文
MemoryContext curTransactionContext; /* my xact-lifetime context */
//查询资源
ResourceOwner curTransactionOwner; /* my query resources */
//按XID顺序保存的已提交的子事务ID
TransactionId *childXids; /* subcommitted child XIDs, in XID order */
//childXids数组大小
int nChildXids; /* # of subcommitted child XIDs */
//分配的childXids数组空间
int maxChildXids; /* allocated size of childXids[] */
//上一个CurrentUserId
Oid prevUser; /* previous CurrentUserId setting */
//上一个SecurityRestrictionContext
int prevSecContext; /* previous SecurityRestrictionContext */
//上一事务是否只读?
bool prevXactReadOnly; /* entry-time xact r/o state */
//是否处于Recovery?
bool startedInRecovery; /* did we start in recovery? */
//XID是否已保存在WAL Record中?
bool didLogXid; /* has xid been included in WAL record? */
//Enter/ExitParallelMode计数器
int parallelModeLevel; /* Enter/ExitParallelMode counter */
//父事务状态
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;
VirtualTransactionId
VirtualTransactionIDs由执行事务的后台进程BackendId和逻辑分配的LocalTransactionId组成.
/*
* Top-level transactions are identified by VirtualTransactionIDs comprising
* the BackendId of the backend running the xact, plus a locally-assigned
* LocalTransactionId. These are guaranteed unique over the short term,
* but will be reused after a database restart; hence they should never
* be stored on disk.
* 最高层的事务通过VirtualTransactionIDs定义.
* VirtualTransactionIDs由执行事务的后台进程BackendId和逻辑分配的LocalTransactionId组成.
*
* Note that struct VirtualTransactionId can not be assumed to be atomically
* assignable as a whole. However, type LocalTransactionId is assumed to
* be atomically assignable, and the backend ID doesn't change often enough
* to be a problem, so we can fetch or assign the two fields separately.
* We deliberately refrain from using the struct within PGPROC, to prevent
* coding errors from trying to use struct assignment with it; instead use
* GET_VXID_FROM_PGPROC().
* 请注意,不能假设struct VirtualTransactionId作为一个整体是原子可分配的。
* 但是,类型LocalTransactionId是假定原子可分配的,同时后台进程ID不会经常变换,因此这不是一个问题,
* 因此我们可以单独提取或者分配这两个域字段.
*
*/
typedef struct
{
BackendId backendId; /* determined at backend startup */
LocalTransactionId localTransactionId; /* backend-local transaction id */
} VirtualTransactionId;
二、源码解读
StartTransaction函数,用于启动事务,设置事务状态为TRANS_INPROGRESS,CurrentTransactionState->state = TRANS_INPROGRESS.
/*
* StartTransaction
* 启动事务
*/
static void
StartTransaction(void)
{
TransactionState s;//事务状态
VirtualTransactionId vxid;//虚拟事务ID
/*
* Let's just make sure the state stack is empty
* 确保事务栈是空的
*/
s = &TopTransactionStateData;
CurrentTransactionState = s;
Assert(XactTopTransactionId == InvalidTransactionId);
/* check the current transaction state */
//检查当前事务状态
Assert(s->state == TRANS_DEFAULT);
/*
* Set the current transaction state information appropriately during
* start processing. Note that once the transaction status is switched
* this process cannot fail until the user ID and the security context
* flags are fetched below.
* 在启动过程中设置当前事务状态信息。
* 请注意,一旦切换了事务状态,在后续获取用户ID和安全上下文标志前,不会出现异常。
*/
s->state = TRANS_START;
//无效事务ID,待分配
s->transactionId = InvalidTransactionId; /* until assigned */
/*
* initialize current transaction state fields
* 初始化当前事务状态字段
*
* note: prevXactReadOnly is not used at the outermost level
* 注意:prevXactReadOnly不会在最外层中使用
*/
s->nestingLevel = 1;
s->gucNestLevel = 1;
s->childXids = NULL;
s->nChildXids = 0;
s->maxChildXids = 0;
/*
* Once the current user ID and the security context flags are fetched,
* both will be properly reset even if transaction startup fails.
* 一旦当前用户ID和安全上下文标记已提取,即使事务启动失败,也会正确地重置它们。
*/
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
/* SecurityRestrictionContext should never be set outside a transaction */
//SecurityRestrictionContext不应在事务外设置
Assert(s->prevSecContext == 0);
/*
* Make sure we've reset xact state variables
* 确保已重置了xact状态变量
*
* If recovery is still in progress, mark this transaction as read-only.
* We have lower level defences in XLogInsert and elsewhere to stop us
* from modifying data during recovery, but this gives the normal
* indication to the user that the transaction is read-only.
* 如仍处于恢复过程,标志此事务为只读.
* 在XLogInsert中和其他地方有低级别的保护机制确保在恢复过程中不会更新数据,
* 只是给用户正常的提示,说明事务只读.
*/
if (RecoveryInProgress())
{
//只读状态
s->startedInRecovery = true;
XactReadOnly = true;
}
else
{
s->startedInRecovery = false;
XactReadOnly = DefaultXactReadOnly;
}
XactDeferrable = DefaultXactDeferrable;
XactIsoLevel = DefaultXactIsoLevel;
forceSyncCommit = false;
MyXactFlags = 0;
/*
* reinitialize within-transaction counters
* 重新初始化事务内计数器
*/
s->subTransactionId = TopSubTransactionId;
currentSubTransactionId = TopSubTransactionId;
currentCommandId = FirstCommandId;
currentCommandIdUsed = false;
/*
* initialize reported xid accounting
* 初始化已报告的事务计数
*/
nUnreportedXids = 0;
s->didLogXid = false;
/*
* must initialize resource-management stuff first
* 必须首先初始化资源管理器
*/
AtStart_Memory();
AtStart_ResourceOwner();
/*
* Assign a new LocalTransactionId, and combine it with the backendId to
* form a virtual transaction id.
* 分配新的本地事务ID(LocalTransactionId),
* 与backendId组成虚拟事务ID.
*/
vxid.backendId = MyBackendId;
vxid.localTransactionId = GetNextLocalTransactionId();
/*
* Lock the virtual transaction id before we announce it in the proc array
* 在proc array声明前,锁定虚拟事务ID
*/
VirtualXactLockTableInsert(vxid);
/*
* Advertise it in the proc array. We assume assignment of
* LocalTransactionID is atomic, and the backendId should be set already.
* 在proc array中声明.
* 假定LocalTransactionID是原子的,backendId已分配.
*/
Assert(MyProc->backendId == vxid.backendId);
MyProc->lxid = vxid.localTransactionId;
TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
/*
* set transaction_timestamp() (a/k/a now()). Normally, we want this to
* be the same as the first command's statement_timestamp(), so don't do a
* fresh GetCurrentTimestamp() call (which'd be expensive anyway). But
* for transactions started inside procedures (i.e., nonatomic SPI
* contexts), we do need to advance the timestamp. Also, in a parallel
* worker, the timestamp should already have been provided by a call to
* SetParallelStartTimestamps().
* 设置transaction_timestamp.
* 正常来说,期望该值与第一条命令的statement_timestamp一样,这样就不需要
* 调用GetCurrentTimestamp进行刷新(昂贵的操作!).
* 但对于在过程中启动的事务(如非原子的SPI上下文),我们确实需要增加时间戳.
* 同样的,在并行worker中,时间戳应通过外层调用SetParallelStartTimestamps提供.
*/
if (!IsParallelWorker())
{
if (!SPI_inside_nonatomic_context())
xactStartTimestamp = stmtStartTimestamp;
else
xactStartTimestamp = GetCurrentTimestamp();
}
else
Assert(xactStartTimestamp != 0);
pgstat_report_xact_timestamp(xactStartTimestamp);
/* Mark xactStopTimestamp as unset. */
//标记xactStopTimestamp未设置
xactStopTimestamp = 0;
/*
* initialize other subsystems for new transaction
* 为新事务初始化其他子系统(GUC/Cache等)
*/
AtStart_GUC();
AtStart_Cache();
AfterTriggerBeginXact();
/*
* done with start processing, set current transaction state to "in
* progress"
* 已完成启动过程,设置事务状态为TRANS_INPROGRESS
*/
s->state = TRANS_INPROGRESS;
ShowTransactionState("StartTransaction");
}
三、跟踪分析
执行begin,触发该函数调用
11:10:36 (xdb@[local]:5432)testdb=# begin;
启动gdb,设置断点
(gdb) b StartTransaction
Breakpoint 4 at 0x54800f: file xact.c, line 1825.
(gdb) c
Continuing.
Breakpoint 4, StartTransaction () at xact.c:1825
1825 s = &TopTransactionStateData;
(gdb)
查看调用栈
(gdb) bt
#0 StartTransaction () at xact.c:1825
#1 0x0000000000548f50 in StartTransactionCommand () at xact.c:2718
#2 0x00000000008c8e7d in start_xact_command () at postgres.c:2500
#3 0x00000000008c6771 in exec_simple_query (query_string=0x24a6ec8 "begin;") at postgres.c:948
#4 0x00000000008cae70 in PostgresMain (argc=1, argv=0x24d2dc8, dbname=0x24d2c30 "testdb", username=0x24a3ba8 "xdb")
at postgres.c:4182
#5 0x000000000082642b in BackendRun (port=0x24c8c00) at postmaster.c:4361
#6 0x0000000000825b8f in BackendStartup (port=0x24c8c00) at postmaster.c:4033
#7 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#8 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x24a1b60) at postmaster.c:1379
#9 0x00000000007488ef in main (argc=1, argv=0x24a1b60) at main.c:228
(gdb)
查看TopTransactionStateData全局变量(尚未初始化)
(gdb) p TopTransactionStateData
$7 = {transactionId = 0, subTransactionId = 0, name = 0x0, savepointLevel = 0, state = TRANS_DEFAULT,
blockState = TBLOCK_DEFAULT, nestingLevel = 0, gucNestLevel = 0, curTransactionContext = 0x0, curTransactionOwner = 0x0,
childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, prevXactReadOnly = false,
startedInRecovery = false, didLogXid = true, parallelModeLevel = 0, parent = 0x0}
设置全局变量CurrentTransactionState = & TopTransactionStateData;
(gdb) n
1826 CurrentTransactionState = s;
(gdb)
1828 Assert(XactTopTransactionId == InvalidTransactionId);
(gdb)
初始化事务状态
(gdb) n
1833 if (s->state != TRANS_DEFAULT)
(gdb)
1841 s->state = TRANS_START;
(gdb)
1842 s->transactionId = InvalidTransactionId; /* until assigned */
(gdb)
1852 if (RecoveryInProgress())
(gdb)
1859 s->startedInRecovery = false;
(gdb)
1860 XactReadOnly = DefaultXactReadOnly;
(gdb)
1862 XactDeferrable = DefaultXactDeferrable;
(gdb)
1863 XactIsoLevel = DefaultXactIsoLevel;
(gdb)
1864 forceSyncCommit = false;
(gdb)
1865 MyXactFlags = 0;
(gdb)
1870 s->subTransactionId = TopSubTransactionId;
(gdb)
1871 currentSubTransactionId = TopSubTransactionId;
(gdb)
1872 currentCommandId = FirstCommandId;
(gdb)
1873 currentCommandIdUsed = false;
(gdb)
1878 nUnreportedXids = 0;
(gdb)
1879 s->didLogXid = false;
(gdb)
1884 AtStart_Memory();
(gdb)
启动subsystem(内存/GUC/Cache等)
(gdb)
1884 AtStart_Memory();
(gdb) n
1885 AtStart_ResourceOwner();
(gdb)
设置虚拟事务ID
1891 vxid.backendId = MyBackendId;
(gdb)
1892 vxid.localTransactionId = GetNextLocalTransactionId();
(gdb)
1897 VirtualXactLockTableInsert(vxid);
(gdb)
1903 Assert(MyProc->backendId == vxid.backendId);
(gdb) p vxid
$8 = {backendId = 3, localTransactionId = 6}
(gdb)
(gdb) n
1904 MyProc->lxid = vxid.localTransactionId;
(gdb)
设置时间戳
1906 TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
(gdb)
1917 if (!IsParallelWorker())
(gdb)
1919 if (!SPI_inside_nonatomic_context())
(gdb)
1920 xactStartTimestamp = stmtStartTimestamp;
(gdb)
1926 pgstat_report_xact_timestamp(xactStartTimestamp);
(gdb)
1928 xactStopTimestamp = 0;
(gdb)
(gdb) p xactStartTimestamp
$9 = 601009839154257
初始化其他字段
(gdb) n
1935 s->nestingLevel = 1;
(gdb) n
1936 s->gucNestLevel = 1;
(gdb)
1937 s->childXids = NULL;
(gdb)
1938 s->nChildXids = 0;
(gdb)
1939 s->maxChildXids = 0;
(gdb)
1940 GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
(gdb)
1942 Assert(s->prevSecContext == 0);
(gdb)
1947 AtStart_GUC();
(gdb)
1948 AtStart_Cache();
(gdb)
1949 AfterTriggerBeginXact();
(gdb)
1955 s->state = TRANS_INPROGRESS;
(gdb)
1957 ShowTransactionState("StartTransaction");
(gdb)
1958 }
(gdb)
初始化后的事务状态
(gdb) p *s
$10 = {transactionId = 0, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS,
blockState = TBLOCK_DEFAULT, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2523850,
curTransactionOwner = 0x24d4868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0,
prevXactReadOnly = false, startedInRecovery = false, didLogXid = false, parallelModeLevel = 0, parent = 0x0}
(gdb)
完成调用
(gdb) n
StartTransactionCommand () at xact.c:2719
2719 s->blockState = TBLOCK_STARTED;
(gdb)
2720 break;
(gdb)
DONE!
网友评论