本节介绍了PostgreSQL提交事务的整体处理逻辑,主要解析了函数CommitTransaction的实现逻辑。
一、数据结构
TransactionState
事务状态结构体
/*
* transaction states - transaction state from server perspective
* 事务状态枚举 - 服务器视角的事务状态
*/
typedef enum TransState
{
TRANS_DEFAULT, /* idle 空闲 */
TRANS_START, /* transaction starting 事务启动 */
TRANS_INPROGRESS, /* inside a valid transaction 进行中 */
TRANS_COMMIT, /* commit in progress 提交中 */
TRANS_ABORT, /* abort in progress 回滚中 */
TRANS_PREPARE /* prepare in progress 准备中 */
} TransState;
/*
* transaction block states - transaction state of client queries
* 事务块状态 - 客户端查询的事务状态
*
* Note: the subtransaction states are used only for non-topmost
* transactions; the others appear only in the topmost transaction.
* 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
*/
typedef enum TBlockState
{
/* not-in-transaction-block states 未进入事务块状态 */
TBLOCK_DEFAULT, /* idle 空闲 */
TBLOCK_STARTED, /* running single-query transaction 单个查询事务 */
/* transaction block states 事务块状态 */
TBLOCK_BEGIN, /* starting transaction block 开始事务块 */
TBLOCK_INPROGRESS, /* live transaction 进行中 */
TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
TBLOCK_END, /* COMMIT received 接收到COMMIT */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_PREPARE, /* live xact, PREPARE received 进行中,接收到PREPARE */
/* subtransaction states 子事务状态 */
TBLOCK_SUBBEGIN, /* starting a subtransaction 开启 */
TBLOCK_SUBINPROGRESS, /* live subtransaction 进行中 */
TBLOCK_SUBRELEASE, /* RELEASE received 接收到RELEASE */
TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;
/*
* transaction state structure
* 事务状态结构体
*/
typedef struct TransactionStateData
{
//事务ID
TransactionId transactionId; /* my XID, or Invalid if none */
//子事务ID
SubTransactionId subTransactionId; /* my subxact ID */
//保存点名称
char *name; /* savepoint name, if any */
//保存点级别
int savepointLevel; /* savepoint level */
//低级别的事务状态
TransState state; /* low-level state */
//高级别的事务状态
TBlockState blockState; /* high-level state */
//事务嵌套深度
int nestingLevel; /* transaction nesting depth */
//GUC上下文嵌套深度
int gucNestLevel; /* GUC context nesting depth */
//事务生命周期上下文
MemoryContext curTransactionContext; /* my xact-lifetime context */
//查询资源
ResourceOwner curTransactionOwner; /* my query resources */
//按XID顺序保存的已提交的子事务ID
TransactionId *childXids; /* subcommitted child XIDs, in XID order */
//childXids数组大小
int nChildXids; /* # of subcommitted child XIDs */
//分配的childXids数组空间
int maxChildXids; /* allocated size of childXids[] */
//上一个CurrentUserId
Oid prevUser; /* previous CurrentUserId setting */
//上一个SecurityRestrictionContext
int prevSecContext; /* previous SecurityRestrictionContext */
//上一事务是否只读?
bool prevXactReadOnly; /* entry-time xact r/o state */
//是否处于Recovery?
bool startedInRecovery; /* did we start in recovery? */
//XID是否已保存在WAL Record中?
bool didLogXid; /* has xid been included in WAL record? */
//Enter/ExitParallelMode计数器
int parallelModeLevel; /* Enter/ExitParallelMode counter */
//父事务状态
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;
二、源码解读
CommitTransaction函数,提交事务,并执行相关的清理操作.
/*
* CommitTransaction
*
* NB: if you change this routine, better look at PrepareTransaction too!
* 注意:如果改变了这个过程的逻辑,最好同时处理PrepareTransaction.
*/
static void
CommitTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
bool is_parallel_worker;
is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
/* Enforce parallel mode restrictions during parallel worker commit. */
//如为并行worker,强制进入并行模式
if (is_parallel_worker)
EnterParallelMode();
ShowTransactionState("CommitTransaction");
/*
* check the current transaction state
* 检查当前事务状态
*/
if (s->state != TRANS_INPROGRESS)
elog(WARNING, "CommitTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
/*
* Do pre-commit processing that involves calling user-defined code, such
* as triggers. Since closing cursors could queue trigger actions,
* triggers could open cursors, etc, we have to keep looping until there's
* nothing left to do.
* 执行涉及调用用户定义代码(如触发器)的预提交处理。
* 因为关闭游标可能会执行触发器,触发器可能打开游标,等等,
* 所以我们必须一直循环,直到没有什么可做的。
*/
for (;;)
{
/*
* Fire all currently pending deferred triggers.
* 触发所有当前活动的触发器
*/
AfterTriggerFireDeferred();
/*
* Close open portals (converting holdable ones into static portals).
* If there weren't any, we are done ... otherwise loop back to check
* if they queued deferred triggers. Lather, rinse, repeat.
* 关闭打开的portals(将可持有门户转换为静态门户).
* 如果已不存在,则说明已完成.
* 否则一直循环检查触发器队列.
*/
if (!PreCommit_Portals(false))
break;
}
CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
: XACT_EVENT_PRE_COMMIT);
/*
* The remaining actions cannot call any user-defined code, so it's safe
* to start shutting down within-transaction services. But note that most
* of this stuff could still throw an error, which would switch us into
* the transaction-abort path.
* 其余的操作不能调用任何用户定义的代码,因此可以安全地开始关闭事务内的服务。
* 但是请注意,大多数这些动作仍然会抛出错误,这将把执行流程切换到事务中止执行路径上。
*/
/* If we might have parallel workers, clean them up now. */
//存在并行worker,清除之
if (IsInParallelMode())
AtEOXact_Parallel(true);
/* Shut down the deferred-trigger manager */
//关闭延迟触发器管理器
AfterTriggerEndXact(true);
/*
* Let ON COMMIT management do its thing (must happen after closing
* cursors, to avoid dangling-reference problems)
* 让ON COMMIT管理器执行这个事情.
* (必须在关闭游标后发生,以避免挂起引用问题)
*/
PreCommit_on_commit_actions();
/* close large objects before lower-level cleanup */
//在低级别的清理请,关闭大对象
AtEOXact_LargeObject(true);
/*
* Mark serializable transaction as complete for predicate locking
* purposes. This should be done as late as we can put it and still allow
* errors to be raised for failure patterns found at commit.
* 将可序列化事务标记为谓词锁定完成。
* 这应该尽可能迟地完成,并且仍然允许在提交时发现的失败从而引发错误。
*/
PreCommit_CheckForSerializationFailure();
/*
* Insert notifications sent by NOTIFY commands into the queue. This
* should be late in the pre-commit sequence to minimize time spent
* holding the notify-insertion lock.
* 将NOTIFY命令发送的通知插入到队列中。
* 这应该在预提交序列的末尾,以最小化持有通知插入锁的时间。
*/
PreCommit_Notify();
/* Prevent cancel/die interrupt while cleaning up */
//在清理时禁用中断
HOLD_INTERRUPTS();
/* Commit updates to the relation map --- do this as late as possible */
//提交更新到relation map -- 尽可能晚的执行该动作
AtEOXact_RelationMap(true, is_parallel_worker);
/*
* set the current transaction state information appropriately during
* commit processing
* 在commit过程中设置当前事务状态信息.
*/
s->state = TRANS_COMMIT;
s->parallelModeLevel = 0;
if (!is_parallel_worker)
{
/*
* We need to mark our XIDs as committed in pg_xact. This is where we
* durably commit.
* 我们需要在pg_xact中将xid标记为已提交。
*/
latestXid = RecordTransactionCommit();
}
else
{
/*
* We must not mark our XID committed; the parallel master is
* responsible for that.
* 并行worker,不需要标记XID的提交标记,并行管理器处理此事情.
*/
latestXid = InvalidTransactionId;
/*
* Make sure the master will know about any WAL we wrote before it
* commits.
* 确保master在提交之前知道worker写入的WAL。
*/
ParallelWorkerReportLastRecEnd(XactLastRecEnd);
}
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
/*
* Let others know about no transaction in progress by me. Note that this
* must be done _before_ releasing locks we hold and _after_
* RecordTransactionCommit.
* 通知其他进程知道该进程没有进行中的事务。
* 注意,这必须在释放持有的锁之前执行,并在RecordTransactionCommit之后执行。
*/
ProcArrayEndTransaction(MyProc, latestXid);
/*
* This is all post-commit cleanup. Note that if an error is raised here,
* it's too late to abort the transaction. This should be just
* noncritical resource releasing.
* 这些都是提交后清理。
* 请注意,如果这里出现错误,则终止事务就太迟了.
* 这应该是非关键的资源释放。
*
* The ordering of operations is not entirely random. The idea is:
* release resources visible to other backends (eg, files, buffer pins);
* then release locks; then release backend-local resources. We want to
* release locks at the point where any backend waiting for us will see
* our transaction as being fully cleaned up.
* 操作的顺序并不是完全随机的。
* 其思想是:释放对其他后台进程可见的资源(如文件、buffer pins);
* 然后释放锁;然后释放后端本地资源。
* 我们希望在所有等待我们的后台进程看到我们的事务被完全清理的时候才释放锁。
*
* Resources that can be associated with individual queries are handled by
* the ResourceOwner mechanism. The other calls here are for backend-wide
* state.
* 与单个查询关联的资源由ResourceOwner机制处理。
* 这里的其他调用是针对后台进程范围的状态的。
*/
CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
: XACT_EVENT_COMMIT);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
true, true);
/* Check we've released all buffer pins */
//检查已释放所有的buffer pins
AtEOXact_Buffers(true);
/* Clean up the relation cache */
//清理关系缓存
AtEOXact_RelationCache(true);
/*
* Make catalog changes visible to all backends. This has to happen after
* relcache references are dropped (see comments for
* AtEOXact_RelationCache), but before locks are released (if anyone is
* waiting for lock on a relation we've modified, we want them to know
* about the catalog change before they start using the relation).
* 使目录更改对所有后台进程可见。
* 这必须发生在relcache引用被删除之后(参见AtEOXact_RelationCache注释),
* 但是在锁被释放之前(如果有人在等待我们修改的关系的锁,我们希望他们在开始使用关系之前知道目录的更改)。
*/
AtEOXact_Inval(true);
AtEOXact_MultiXact();
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_AFTER_LOCKS,
true, true);
/*
* Likewise, dropping of files deleted during the transaction is best done
* after releasing relcache and buffer pins. (This is not strictly
* necessary during commit, since such pins should have been released
* already, but this ordering is definitely critical during abort.) Since
* this may take many seconds, also delay until after releasing locks.
* Other backends will observe the attendant catalog changes and not
* attempt to access affected files.
* 同样,在事务期间删除的文件的清理最好在释放relcache和buffer pin之后进行。
* (这在提交过程中并不是必须的,因为这样的pins应该已经被释放了,
* 但是该顺序在中止过程中绝对是至关重要的。)
* 因为这可能需要较长的时间,所以也要延迟到释放锁之后。
* 其他后台进程将监控相关的catalog更改,不尝试访问受影响的文件。
*/
smgrDoPendingDeletes(true);
AtCommit_Notify();
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
AtEOXact_Enum();
AtEOXact_on_commit_actions(true);
AtEOXact_Namespace(true, is_parallel_worker);
AtEOXact_SMgr();
AtEOXact_Files(true);
AtEOXact_ComboCid();
AtEOXact_HashTables(true);
AtEOXact_PgStat(true);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
ResourceOwnerDelete(TopTransactionResourceOwner);
s->curTransactionOwner = NULL;
CurTransactionResourceOwner = NULL;
TopTransactionResourceOwner = NULL;
AtCommit_Memory();
s->transactionId = InvalidTransactionId;
s->subTransactionId = InvalidSubTransactionId;
s->nestingLevel = 0;
s->gucNestLevel = 0;
s->childXids = NULL;
s->nChildXids = 0;
s->maxChildXids = 0;
XactTopTransactionId = InvalidTransactionId;
nParallelCurrentXids = 0;
/*
* done with commit processing, set current transaction state back to
* default
* 完成提交处理后,将当前事务状态设置为default
*/
s->state = TRANS_DEFAULT;
RESUME_INTERRUPTS();
}
三、跟踪分析
插入数据,执行commit
10:57:56 (xdb@[local]:5432)testdb=# begin;
BEGIN
10:57:59 (xdb@[local]:5432)testdb=#* insert into t_session1 values(1);
INSERT 0 1
10:58:01 (xdb@[local]:5432)testdb=#* commit;
启动gdb,设置断点
(gdb) b CommitTransaction
Breakpoint 1 at 0x5482ae: file xact.c, line 1969.
(gdb) c
Continuing.
Breakpoint 1, CommitTransaction () at xact.c:1969
1969 TransactionState s = CurrentTransactionState;
(gdb)
查看调用栈
(gdb) bt
#0 CommitTransaction () at xact.c:1969
#1 0x0000000000549078 in CommitTransactionCommand () at xact.c:2831
#2 0x00000000008c8ea9 in finish_xact_command () at postgres.c:2523
#3 0x00000000008c6b5d in exec_simple_query (query_string=0x2c97ec8 "commit;") at postgres.c:1170
#4 0x00000000008cae70 in PostgresMain (argc=1, argv=0x2cc3dc8, dbname=0x2cc3c30 "testdb", username=0x2c94ba8 "xdb")
at postgres.c:4182
#5 0x000000000082642b in BackendRun (port=0x2cb9c00) at postmaster.c:4361
#6 0x0000000000825b8f in BackendStartup (port=0x2cb9c00) at postmaster.c:4033
#7 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#8 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x2c92b60) at postmaster.c:1379
#9 0x00000000007488ef in main (argc=1, argv=0x2c92b60) at main.c:228
(gdb)
当前事务信息
(gdb) p *s
$1 = {transactionId = 2410, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS,
blockState = TBLOCK_END, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2d3cfa0,
curTransactionOwner = 0x2cc5868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0,
prevXactReadOnly = false, startedInRecovery = false, didLogXid = true, parallelModeLevel = 0, parent = 0x0}
(gdb)
执行相关判断,执行预处理等
(gdb) n
1976 if (is_parallel_worker)
(gdb)
1979 ShowTransactionState("CommitTransaction");
(gdb)
1984 if (s->state != TRANS_INPROGRESS)
(gdb)
1987 Assert(s->parent == NULL);
(gdb)
2000 AfterTriggerFireDeferred();
(gdb)
2007 if (!PreCommit_Portals(false))
(gdb)
2008 break;
(gdb)
2011 CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
(gdb)
继续执行预处理
(gdb) n
2022 if (IsInParallelMode())
(gdb)
2026 AfterTriggerEndXact(true);
(gdb)
2032 PreCommit_on_commit_actions();
(gdb)
2035 AtEOXact_LargeObject(true);
(gdb)
(gdb)
2042 PreCommit_CheckForSerializationFailure();
(gdb)
2049 PreCommit_Notify();
(gdb)
2052 HOLD_INTERRUPTS();
(gdb)
2055 AtEOXact_RelationMap(true);
(gdb)
修改事务状态
2061 s->state = TRANS_COMMIT;
(gdb)
2062 s->parallelModeLevel = 0;
(gdb)
执行实际的提交事务操作
(gdb)
2064 if (!is_parallel_worker)
(gdb)
2070 latestXid = RecordTransactionCommit();
(gdb)
2087 TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
(gdb)
通知其他进程知道该进程没有进行中的事务。
(gdb)
2094 ProcArrayEndTransaction(MyProc, latestXid);
(gdb)
2112 CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
(gdb)
释放资源
(gdb)
2115 ResourceOwnerRelease(TopTransactionResourceOwner,
(gdb)
2120 AtEOXact_Buffers(true);
(gdb)
2123 AtEOXact_RelationCache(true);
(gdb)
2132 AtEOXact_Inval(true);
(gdb)
2134 AtEOXact_MultiXact();
(gdb)
2136 ResourceOwnerRelease(TopTransactionResourceOwner,
(gdb)
2139 ResourceOwnerRelease(TopTransactionResourceOwner,
执行清理操作
(gdb)
2152 smgrDoPendingDeletes(true);
(gdb)
2154 AtCommit_Notify();
(gdb)
2155 AtEOXact_GUC(true, 1);
(gdb)
2156 AtEOXact_SPI(true);
(gdb)
2157 AtEOXact_on_commit_actions(true);
(gdb)
2158 AtEOXact_Namespace(true, is_parallel_worker);
(gdb)
2159 AtEOXact_SMgr();
(gdb)
2160 AtEOXact_Files(true);
(gdb)
2161 AtEOXact_ComboCid();
(gdb)
2162 AtEOXact_HashTables(true);
(gdb)
2163 AtEOXact_PgStat(true);
(gdb)
2164 AtEOXact_Snapshot(true, false);
(gdb)
2165 AtEOXact_ApplyLauncher(true);
(gdb)
2166 pgstat_report_xact_timestamp(0);
(gdb)
2168 CurrentResourceOwner = NULL;
(gdb)
2169 ResourceOwnerDelete(TopTransactionResourceOwner);
(gdb)
2170 s->curTransactionOwner = NULL;
(gdb)
重置事务状态
(gdb)
2171 CurTransactionResourceOwner = NULL;
(gdb)
2172 TopTransactionResourceOwner = NULL;
(gdb)
2174 AtCommit_Memory();
(gdb)
2176 s->transactionId = InvalidTransactionId;
(gdb)
2177 s->subTransactionId = InvalidSubTransactionId;
(gdb)
2178 s->nestingLevel = 0;
(gdb)
2179 s->gucNestLevel = 0;
(gdb)
2180 s->childXids = NULL;
(gdb)
2181 s->nChildXids = 0;
(gdb)
2182 s->maxChildXids = 0;
(gdb)
2184 XactTopTransactionId = InvalidTransactionId;
(gdb)
2185 nParallelCurrentXids = 0;
(gdb)
2191 s->state = TRANS_DEFAULT;
(gdb)
2193 RESUME_INTERRUPTS();
(gdb)
2194 }
(gdb)
重置后的事务状态
(gdb) p *s
$2 = {transactionId = 0, subTransactionId = 0, name = 0x0, savepointLevel = 0, state = TRANS_DEFAULT,
blockState = TBLOCK_END, nestingLevel = 0, gucNestLevel = 0, curTransactionContext = 0x0, curTransactionOwner = 0x0,
childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, prevXactReadOnly = false,
startedInRecovery = false, didLogXid = true, parallelModeLevel = 0, parent = 0x0}
(gdb)
执行完毕
(gdb) n
CommitTransactionCommand () at xact.c:2832
2832 s->blockState = TBLOCK_DEFAULT;
(gdb)
DONE!
网友评论