美文网首页PostgreSQL
PostgreSQL 源码解读(4)- 插入数据#3(heap_

PostgreSQL 源码解读(4)- 插入数据#3(heap_

作者: EthanHe | 来源:发表于2018-08-02 15:56 被阅读24次

    本文简单介绍了PG插入数据部分的源码,这是第三部分,主要内容包括heap_insert函数的实现逻辑,该函数在源文件heapam.c中。

    一、基础信息

    heap_insert使用的数据结构、宏定义以及依赖的函数等。
    数据结构/宏定义

    1、CommandId
    32bit无符号整型
    typedef uint32 CommandId;
    
    2、options
    整型,标记bits
     /* "options" flag bits for heap_insert */
     #define HEAP_INSERT_SKIP_WAL    0x0001
     #define HEAP_INSERT_SKIP_FSM    0x0002
     #define HEAP_INSERT_FROZEN      0x0004
     #define HEAP_INSERT_SPECULATIVE 0x0008
    
    3、BulkInsertState
    批量插入状态指针
      /*
      * state for bulk inserts --- private to heapam.c and hio.c
      *
      * If current_buf isn't InvalidBuffer, then we are holding an extra pin
      * on that buffer.
      *
      * "typedef struct BulkInsertStateData *BulkInsertState" is in heapam.h
      */
     typedef struct BulkInsertStateData
     {
         BufferAccessStrategy strategy;  /* our BULKWRITE strategy object */
         Buffer      current_buf;    /* current insertion target page */
     }     BulkInsertStateData;
     
    typedef struct BulkInsertStateData *BulkInsertState;
    
    4、TransactionId
    32bit无符号整型
     typedef uint32 TransactionId;
     typedef uint32 LocalTransactionId;
     typedef uint32 SubTransactionId;
    
    5、xl_heap_insert
     typedef struct xl_heap_insert
     {
         OffsetNumber offnum;        /* inserted tuple's offset */
         uint8       flags;
         /* xl_heap_header & TUPLE DATA in backup block 0 */
     } xl_heap_insert;
     
     #define SizeOfHeapInsert    (offsetof(xl_heap_insert, flags) + sizeof(uint8))
    
    6、xl_heap_header
     typedef struct xl_heap_header
     {
         uint16      t_infomask2;
         uint16      t_infomask;
         uint8       t_hoff;
     } xl_heap_header;
     
     #define SizeOfHeapHeader    (offsetof(xl_heap_header, t_hoff) + sizeof(uint8))
    
    7、XLogRecPtr
    64bit无符号长整型
     typedef uint64 XLogRecPtr;
    
    

    依赖的函数
    1、heap_prepare_insert

    /*
     * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
     * tuple header fields, assigns an OID, and toasts the tuple if necessary.
     * Returns a toasted version of the tuple if it was toasted, or the original
     * tuple if not. Note that in any case, the header fields are also set in
     * the original tuple.
     */
    static HeapTuple
    heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
                        CommandId cid, int options)
    {
        /*
         * Parallel operations are required to be strictly read-only in a parallel
         * worker.  Parallel inserts are not safe even in the leader in the
         * general case, because group locking means that heavyweight locks for
         * relation extension or GIN page locks will not conflict between members
         * of a lock group, but we don't prohibit that case here because there are
         * useful special cases that we can safely allow, such as CREATE TABLE AS.
         */
        //暂不支持并行操作
        if (IsParallelWorker())
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                     errmsg("cannot insert tuples in a parallel worker")));
    
        //设置Oid
        if (relation->rd_rel->relhasoids)
        {
    #ifdef NOT_USED
            /* this is redundant with an Assert in HeapTupleSetOid */
            Assert(tup->t_data->t_infomask & HEAP_HASOID);
    #endif
    
            /*
             * If the object id of this tuple has already been assigned, trust the
             * caller.  There are a couple of ways this can happen.  At initial db
             * creation, the backend program sets oids for tuples. When we define
             * an index, we set the oid.  Finally, in the future, we may allow
             * users to set their own object ids in order to support a persistent
             * object store (objects need to contain pointers to one another).
             */
            if (!OidIsValid(HeapTupleGetOid(tup)))
                HeapTupleSetOid(tup, GetNewOid(relation));
        }
        else
        {
            /* check there is not space for an OID */
            Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
        }
    
        //设置标记位t_infomask/t_infomask2
        tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);//HEAP_XACT_MASK=0xFFF0,取反
        tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);//HEAP2_XACT_MASK=0xE000,取反
        tup->t_data->t_infomask |= HEAP_XMAX_INVALID;//插入数据,XMAX设置为invalid
        HeapTupleHeaderSetXmin(tup->t_data, xid);//设置xmin为当前事务id
        if (options & HEAP_INSERT_FROZEN)//冻结型插入(在事务id回卷时发生)
            HeapTupleHeaderSetXminFrozen(tup->t_data);
        //设置cid
        HeapTupleHeaderSetCmin(tup->t_data, cid);
        //设置xmax=0
        HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
        //设置Oid
        tup->t_tableOid = RelationGetRelid(relation);
    
        /*
         * If the new tuple is too big for storage or contains already toasted
         * out-of-line attributes from some other relation, invoke the toaster.
         */
        if (relation->rd_rel->relkind != RELKIND_RELATION &&
            relation->rd_rel->relkind != RELKIND_MATVIEW)
        {
            /* toast table entries should never be recursively toasted */
            Assert(!HeapTupleHasExternal(tup));
            return tup;
        }
        else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
            return toast_insert_or_update(relation, tup, NULL, options);
        else
            return tup;
    }
    

    2、RelationGetBufferForTuple
    稍长,请耐心阅读,如能读懂,必有收获

    /*
    输入:
        relation-数据表
        len-需要的空间大小
        otherBuffer-用于update场景,上一次pinned的buffer
        options-处理选项
        bistate-BulkInsert标记
        vmbuffer-第1个vm(visibilitymap)
        vmbuffer_other-用于update场景,上一次pinned的buffer对应的vm(visibilitymap)
        注意:
        otherBuffer这个参数让人觉得困惑,原因是PG的机制使然
        Update时,不是原地更新,而是原数据保留(更新xmax),新数据插入
        原数据&新数据如果在不同Block中,锁定Block的时候可能会出现Deadlock
        举个例子:Session A更新表T的第一行,第一行在Block 0中,新数据存储在Block 2中
                  Session B更新表T的第二行,第二行在Block 0中,新数据存储在Block 2中
                  Block 0/2均要锁定才能完整实现Update操作:
                  如果Session A先锁定了Block 2,Session B先锁定了Block 0,
                  然后Session A尝试锁定Block 0,Session B尝试锁定Block 2,这时候就会出现死锁
                  为了避免这种情况,PG规定锁定时,同一个Relation,按Block的编号顺序锁定,
                  如需要锁定0和2,那必须先锁定Block 0,再锁定2
    输出:
        为Tuple分配的Buffer
    附:
    Pinned buffers:means buffers are currently being used,it should not be flushed out.
    */
    Buffer
    RelationGetBufferForTuple(Relation relation, Size len,
                              Buffer otherBuffer, int options,
                              BulkInsertState bistate,
                              Buffer *vmbuffer, Buffer *vmbuffer_other)
    {
        bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);//是否使用FSM寻找空闲空间
        Buffer      buffer = InvalidBuffer;//
        Page        page;//
        Size        pageFreeSpace = 0,//page空闲空间
                    saveFreeSpace = 0;//page需要预留的空间
        BlockNumber targetBlock,//目标Block
                    otherBlock;//上一次pinned的buffer对应的Block
        bool        needLock;//是否需要上锁
    
        len = MAXALIGN(len);        /* be conservative *///大小对齐
    
        /* Bulk insert is not supported for updates, only inserts. */
        Assert(otherBuffer == InvalidBuffer || !bistate);//otherBuffer有效,说明是update操作,不支持bi(BulkInsert)
    
        /*
         * If we're gonna fail for oversize tuple, do it right away
         */
        if (len > MaxHeapTupleSize)
            ereport(ERROR,
                    (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                     errmsg("row is too big: size %zu, maximum size %zu",
                            len, MaxHeapTupleSize)));
    
        /* Compute desired extra freespace due to fillfactor option */
        //获取预留空间
        saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
                                                       HEAP_DEFAULT_FILLFACTOR);
        //update操作,获取上次pinned buffer对应的Block
        if (otherBuffer != InvalidBuffer)
            otherBlock = BufferGetBlockNumber(otherBuffer);
        else
            otherBlock = InvalidBlockNumber;    /* just to keep compiler quiet */
    
        /*
         * We first try to put the tuple on the same page we last inserted a tuple
         * on, as cached in the BulkInsertState or relcache entry.  If that
         * doesn't work, we ask the Free Space Map to locate a suitable page.
         * Since the FSM's info might be out of date, we have to be prepared to
         * loop around and retry multiple times. (To insure this isn't an infinite
         * loop, we must update the FSM with the correct amount of free space on
         * each page that proves not to be suitable.)  If the FSM has no record of
         * a page with enough free space, we give up and extend the relation.
         *
         * When use_fsm is false, we either put the tuple onto the existing target
         * page or extend the relation.
         */
        if (len + saveFreeSpace > MaxHeapTupleSize)
        {
            //如果需要的大小+预留空间大于可容纳的最大Tuple大小,不使用FSM,扩展后再尝试
            /* can't fit, don't bother asking FSM */
            targetBlock = InvalidBlockNumber;
            use_fsm = false;
        }
        else if (bistate && bistate->current_buf != InvalidBuffer)//BulkInsert模式
            targetBlock = BufferGetBlockNumber(bistate->current_buf);
        else
            targetBlock = RelationGetTargetBlock(relation);//普通Insert模式
    
        if (targetBlock == InvalidBlockNumber && use_fsm)//还没有找到合适的BlockNumber,需要使用FSM
        {
            /*
             * We have no cached target page, so ask the FSM for an initial
             * target.
             */
            //使用FSM申请空闲空间=len + saveFreeSpace的块
            targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
    
            /*
             * If the FSM knows nothing of the rel, try the last page before we
             * give up and extend.  This avoids one-tuple-per-page syndrome during
             * bootstrapping or in a recently-started system.
             */
            //申请不到,使用最后一个块,否则扩展或者放弃
            if (targetBlock == InvalidBlockNumber)
            {
                BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
    
                if (nblocks > 0)
                    targetBlock = nblocks - 1;
            }
        }
    
    loop:
        while (targetBlock != InvalidBlockNumber)//已成功获取插入数据的块号
        {
            /*
             * Read and exclusive-lock the target block, as well as the other
             * block if one was given, taking suitable care with lock ordering and
             * the possibility they are the same block.
             *
             * If the page-level all-visible flag is set, caller will need to
             * clear both that and the corresponding visibility map bit.  However,
             * by the time we return, we'll have x-locked the buffer, and we don't
             * want to do any I/O while in that state.  So we check the bit here
             * before taking the lock, and pin the page if it appears necessary.
             * Checking without the lock creates a risk of getting the wrong
             * answer, so we'll have to recheck after acquiring the lock.
             */
            if (otherBuffer == InvalidBuffer)//非Update操作
            {
                /* easy case */
                buffer = ReadBufferBI(relation, targetBlock, bistate);//获取Buffer
                if (PageIsAllVisible(BufferGetPage(buffer)))
                    //如果Page全局可见,那么把Page Pin在内存中(Pin的意思是固定/保留)
                    visibilitymap_pin(relation, targetBlock, vmbuffer);
                LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//锁定buffer
            }
            else if (otherBlock == targetBlock)//Update操作,新记录跟原记录在同一个Block中
            {
                /* also easy case */
                buffer = otherBuffer;
                if (PageIsAllVisible(BufferGetPage(buffer)))
                    visibilitymap_pin(relation, targetBlock, vmbuffer);
                LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
            }
            else if (otherBlock < targetBlock)//Update操作,原记录所在的Block < 新记录的Block
            {
                /* lock other buffer first */
                buffer = ReadBuffer(relation, targetBlock);
                if (PageIsAllVisible(BufferGetPage(buffer)))
                    visibilitymap_pin(relation, targetBlock, vmbuffer);
                LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);//优先锁定BlockNumber小的那个
                LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
            }
            else//Update操作,原记录所在的Block > 新记录的Block
            {
                /* lock target buffer first */
                buffer = ReadBuffer(relation, targetBlock);
                if (PageIsAllVisible(BufferGetPage(buffer)))
                    visibilitymap_pin(relation, targetBlock, vmbuffer);
                LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//优先锁定BlockNumber小的那个
                LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
            }
    
            /*
             * We now have the target page (and the other buffer, if any) pinned
             * and locked.  However, since our initial PageIsAllVisible checks
             * were performed before acquiring the lock, the results might now be
             * out of date, either for the selected victim buffer, or for the
             * other buffer passed by the caller.  In that case, we'll need to
             * give up our locks, go get the pin(s) we failed to get earlier, and
             * re-lock.  That's pretty painful, but hopefully shouldn't happen
             * often.
             *
             * Note that there's a small possibility that we didn't pin the page
             * above but still have the correct page pinned anyway, either because
             * we've already made a previous pass through this loop, or because
             * caller passed us the right page anyway.
             *
             * Note also that it's possible that by the time we get the pin and
             * retake the buffer locks, the visibility map bit will have been
             * cleared by some other backend anyway.  In that case, we'll have
             * done a bit of extra work for no gain, but there's no real harm
             * done.
             */
            if (otherBuffer == InvalidBuffer || buffer <= otherBuffer)
                GetVisibilityMapPins(relation, buffer, otherBuffer,
                                     targetBlock, otherBlock, vmbuffer,
                                     vmbuffer_other);//Pin VM在内存中
            else
                GetVisibilityMapPins(relation, otherBuffer, buffer,
                                     otherBlock, targetBlock, vmbuffer_other,
                                     vmbuffer);//Pin VM在内存中
    
            /*
             * Now we can check to see if there's enough free space here. If so,
             * we're done.
             */
            page = BufferGetPage(buffer);
            pageFreeSpace = PageGetHeapFreeSpace(page);
            if (len + saveFreeSpace <= pageFreeSpace)//有足够的空间存储数据,返回此Buffer
            {
                /* use this page as future insert target, too */
                RelationSetTargetBlock(relation, targetBlock);
                return buffer;
            }
    
            /*
             * Not enough space, so we must give up our page locks and pin (if
             * any) and prepare to look elsewhere.  We don't care which order we
             * unlock the two buffers in, so this can be slightly simpler than the
             * code above.
             */
            LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
            if (otherBuffer == InvalidBuffer)
                ReleaseBuffer(buffer);
            else if (otherBlock != targetBlock)
            {
                LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(buffer);
            }
    
            /* Without FSM, always fall out of the loop and extend */
            if (!use_fsm)//不使用FSM定位空闲空间,跳出循环,执行扩展
                break;
    
            /*
             * Update FSM as to condition of this page, and ask for another page
             * to try.
             */
            //使用FSM获取下一个备选的Block
            //注意:如果全部扫描后发现没有满足条件的Block,targetBlock = InvalidBlockNumber,跳出循环
            targetBlock = RecordAndGetPageWithFreeSpace(relation,
                                                        targetBlock,
                                                        pageFreeSpace,
                                                        len + saveFreeSpace);
        }
        
        //没有获取满足条件的Block,扩展表
        /*
         * Have to extend the relation.
         *
         * We have to use a lock to ensure no one else is extending the rel at the
         * same time, else we will both try to initialize the same new page.  We
         * can skip locking for new or temp relations, however, since no one else
         * could be accessing them.
         */
        needLock = !RELATION_IS_LOCAL(relation);//新创建的数据表或者临时表,无需Lock
    
        /*
         * If we need the lock but are not able to acquire it immediately, we'll
         * consider extending the relation by multiple blocks at a time to manage
         * contention on the relation extension lock.  However, this only makes
         * sense if we're using the FSM; otherwise, there's no point.
         */
        if (needLock)//需要锁定
        {
            if (!use_fsm)
                LockRelationForExtension(relation, ExclusiveLock);
            else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
            {
                /* Couldn't get the lock immediately; wait for it. */
                LockRelationForExtension(relation, ExclusiveLock);
    
                /*
                 * Check if some other backend has extended a block for us while
                 * we were waiting on the lock.
                 */
                //如有其它进程扩展了数据表,那么可以成功获取满足条件的targetBlock
                targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
    
                /*
                 * If some other waiter has already extended the relation, we
                 * don't need to do so; just use the existing freespace.
                 */
                if (targetBlock != InvalidBlockNumber)
                {
                    UnlockRelationForExtension(relation, ExclusiveLock);
                    goto loop;
                }
    
                /* Time to bulk-extend. */
                //其它进程没有扩展
                //Just extend it!
                RelationAddExtraBlocks(relation, bistate);
            }
        }
    
        /*
         * In addition to whatever extension we performed above, we always add at
         * least one block to satisfy our own request.
         *
         * XXX This does an lseek - rather expensive - but at the moment it is the
         * only way to accurately determine how many blocks are in a relation.  Is
         * it worth keeping an accurate file length in shared memory someplace,
         * rather than relying on the kernel to do it for us?
         */
        //扩展表后,New Page!
        buffer = ReadBufferBI(relation, P_NEW, bistate);
    
        /*
         * We can be certain that locking the otherBuffer first is OK, since it
         * must have a lower page number.
         */
        if (otherBuffer != InvalidBuffer)
            LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);//otherBuffer的顺序一定在扩展的Block之后,Lock it!
    
        /*
         * Now acquire lock on the new page.
         */
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//锁定New Page
    
        /*
         * Release the file-extension lock; it's now OK for someone else to extend
         * the relation some more.  Note that we cannot release this lock before
         * we have buffer lock on the new page, or we risk a race condition
         * against vacuumlazy.c --- see comments therein.
         */
        if (needLock)
            UnlockRelationForExtension(relation, ExclusiveLock);//释放扩展锁
    
        /*
         * We need to initialize the empty new page.  Double-check that it really
         * is empty (this should never happen, but if it does we don't want to
         * risk wiping out valid data).
         */
        page = BufferGetPage(buffer);//获取相应的Page
    
        if (!PageIsNew(page))//不是New Page,那一定某个地方搞错了!
            elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
                 BufferGetBlockNumber(buffer),
                 RelationGetRelationName(relation));
        //初始化New Page
        PageInit(page, BufferGetPageSize(buffer), 0);
        //New Page也满足不了要求的大小,报错
        if (len > PageGetHeapFreeSpace(page))
        {
            /* We should not get here given the test at the top */
            elog(PANIC, "tuple is too big: size %zu", len);
        }
    
        /*
         * Remember the new page as our target for future insertions.
         *
         * XXX should we enter the new page into the free space map immediately,
         * or just keep it for this backend's exclusive use in the short run
         * (until VACUUM sees it)?  Seems to depend on whether you expect the
         * current backend to make more insertions or not, which is probably a
         * good bet most of the time.  So for now, don't add it to FSM yet.
         */
        //终于找到了可用于存储数据的Block
        RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
        //返回
        return buffer;
    }
    
    //-------------------------------------------------------------------------------
    /*
     * Read in a buffer, using bulk-insert strategy if bistate isn't NULL.
     */
    static Buffer
    ReadBufferBI(Relation relation, BlockNumber targetBlock,
                 BulkInsertState bistate)
    {
        Buffer      buffer;
    
        /* If not bulk-insert, exactly like ReadBuffer */
        if (!bistate)
            return ReadBuffer(relation, targetBlock);//非BulkInsert模式,使用常规方法获取
        
        //TODO 以下为BI模式
        /* If we have the desired block already pinned, re-pin and return it */
        if (bistate->current_buf != InvalidBuffer)
        {
            if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
            {
                IncrBufferRefCount(bistate->current_buf);
                return bistate->current_buf;
            }
            /* ... else drop the old buffer */
            ReleaseBuffer(bistate->current_buf);
            bistate->current_buf = InvalidBuffer;
        }
    
        /* Perform a read using the buffer strategy */
        buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
                                    RBM_NORMAL, bistate->strategy);
    
        /* Save the selected block as target for future inserts */
        IncrBufferRefCount(buffer);
        bistate->current_buf = buffer;
    
        return buffer;
    }
    
     /*
      * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
      *      fork with RBM_NORMAL mode and default strategy.
      */
     Buffer
     ReadBuffer(Relation reln, BlockNumber blockNum)
     {
         return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
     }
    
     typedef enum ForkNumber
     {
         InvalidForkNumber = -1,
         MAIN_FORKNUM = 0,
         FSM_FORKNUM,
         VISIBILITYMAP_FORKNUM,
         INIT_FORKNUM
     
         /*
          * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
          * FORKNAMECHARS below, and update the forkNames array in
          * src/common/relpath.c
          */
     } ForkNumber; 
    //参考url : https://www.postgresql.org/docs/11/static/storage-file-layout.html
    
     /*
      * ReadBufferExtended -- returns a buffer containing the requested
      *      block of the requested relation.  If the blknum
      *      requested is P_NEW, extend the relation file and
      *      allocate a new block.  (Caller is responsible for
      *      ensuring that only one backend tries to extend a
      *      relation at the same time!)
      *
      * Returns: the buffer number for the buffer containing
      *      the block read.  The returned buffer has been pinned.
      *      Does not return on error --- elog's instead.
      *
      * Assume when this function is called, that reln has been opened already.
      *
      * In RBM_NORMAL mode, the page is read from disk, and the page header is
      * validated.  An error is thrown if the page header is not valid.  (But
      * note that an all-zero page is considered "valid"; see PageIsVerified().)
      *
      * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
      * valid, the page is zeroed instead of throwing an error. This is intended
      * for non-critical data, where the caller is prepared to repair errors.
      *
      * In RBM_ZERO_AND_LOCK mode, if the page isn't in buffer cache already, it's
      * filled with zeros instead of reading it from disk.  Useful when the caller
      * is going to fill the page from scratch, since this saves I/O and avoids
      * unnecessary failure if the page-on-disk has corrupt page headers.
      * The page is returned locked to ensure that the caller has a chance to
      * initialize the page before it's made visible to others.
      * Caution: do not use this mode to read a page that is beyond the relation's
      * current physical EOF; that is likely to cause problems in md.c when
      * the page is modified and written out. P_NEW is OK, though.
      *
      * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
      * a cleanup-strength lock on the page.
      *
      * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
      *
      * If strategy is not NULL, a nondefault buffer access strategy is used.
      * See buffer/README for details.
      */
     Buffer
     ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
                        ReadBufferMode mode, BufferAccessStrategy strategy)
     {
         bool        hit;
         Buffer      buf;
     
         /* Open it at the smgr level if not already done */
         RelationOpenSmgr(reln);//Smgr=Storage Manager,数据表存储管理封装
     
         /*
          * Reject attempts to read non-local temporary relations; we would be
          * likely to get wrong data since we have no visibility into the owning
          * session's local buffers.
          */
         if (RELATION_IS_OTHER_TEMP(reln))
             ereport(ERROR,
                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                      errmsg("cannot access temporary tables of other sessions")));
     
         /*
          * Read the buffer, and update pgstat counters to reflect a cache hit or
          * miss.
          */
         pgstat_count_buffer_read(reln);//统计信息
         //TODO Buffer管理后续再行解读
         buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
                                 forkNum, blockNum, mode, strategy, &hit);
         if (hit)
             pgstat_count_buffer_hit(reln);//统计信息
         return buf;
     }
     
    

    3、CheckForSerializableConflictIn
    检查序列化操作是否会出现冲突。比如并发执行delete & update操作的时候。

    /*
      * CheckForSerializableConflictIn
      *      We are writing the given tuple.  If that indicates a rw-conflict
      *      in from another serializable transaction, take appropriate action.
      *
      * Skip checking for any granularity for which a parameter is missing.
      *
      * A tuple update or delete is in conflict if we have a predicate lock
      * against the relation or page in which the tuple exists, or against the
      * tuple itself.
      */
     void
     CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
                                    Buffer buffer)
     {
         PREDICATELOCKTARGETTAG targettag;
     
         if (!SerializationNeededForWrite(relation))
             return;
     
         /* Check if someone else has already decided that we need to die */
         if (SxactIsDoomed(MySerializableXact))
             ereport(ERROR,
                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                      errmsg("could not serialize access due to read/write dependencies among transactions"),
                      errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
                      errhint("The transaction might succeed if retried.")));
     
         /*
          * We're doing a write which might cause rw-conflicts now or later.
          * Memorize that fact.
          */
         MyXactDidWrite = true;
     
         /*
          * It is important that we check for locks from the finest granularity to
          * the coarsest granularity, so that granularity promotion doesn't cause
          * us to miss a lock.  The new (coarser) lock will be acquired before the
          * old (finer) locks are released.
          *
          * It is not possible to take and hold a lock across the checks for all
          * granularities because each target could be in a separate partition.
          */
         if (tuple != NULL)
         {
             SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
                                              relation->rd_node.dbNode,
                                              relation->rd_id,
                                              ItemPointerGetBlockNumber(&(tuple->t_self)),
                                              ItemPointerGetOffsetNumber(&(tuple->t_self)));
             CheckTargetForConflictsIn(&targettag);
         }
     
         if (BufferIsValid(buffer))
         {
             SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
                                             relation->rd_node.dbNode,
                                             relation->rd_id,
                                             BufferGetBlockNumber(buffer));
             CheckTargetForConflictsIn(&targettag);
         }
     
         SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
                                             relation->rd_node.dbNode,
                                             relation->rd_id);
         CheckTargetForConflictsIn(&targettag);
     }
    

    4、START_CRIT_SECTION

    extern PGDLLIMPORT volatile uint32 CritSectionCount; 
    #define START_CRIT_SECTION()  (CritSectionCount++)
    

    5、PageIsAllVisible

    通过位操作判断Page是否All Visible
    #define PageIsAllVisible(page) \
         (((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
    

    6、PageClearAllVisible

    通过位操作清除All Visible标记
     #define PageClearAllVisible(page) \
      (((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
    

    7、visibilitymap_clear

    //TODO 缓冲区管理相关的设置,待进一步理解
     /*
      *  visibilitymap_clear - clear specified bits for one page in visibility map
      *
      * You must pass a buffer containing the correct map page to this function.
      * Call visibilitymap_pin first to pin the right one. This function doesn't do
      * any I/O.  Returns true if any bits have been cleared and false otherwise.
      */
     bool
     visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf, uint8 flags)
     {
         BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
         int         mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
         int         mapOffset = HEAPBLK_TO_OFFSET(heapBlk);
         uint8       mask = flags << mapOffset;
         char       *map;
         bool        cleared = false;
     
         Assert(flags & VISIBILITYMAP_VALID_BITS);
     
     #ifdef TRACE_VISIBILITYMAP
         elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
     #endif
          if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
             elog(ERROR, "wrong buffer passed to visibilitymap_clear");
     
         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
         map = PageGetContents(BufferGetPage(buf));
     
         if (map[mapByte] & mask)
         {
             map[mapByte] &= ~mask;
              MarkBufferDirty(buf);
             cleared = true;
         }
          LockBuffer(buf, BUFFER_LOCK_UNLOCK);
          return cleared;
     }
    

    8、MarkBufferDirty

    //设置缓冲块为Dirty(待Flush到数据文件)
    //TODO 缓冲区相关管理
     /*
      * MarkBufferDirty
      *
      *      Marks buffer contents as dirty (actual write happens later).
      *
      * Buffer must be pinned and exclusive-locked.  (If caller does not hold
      * exclusive lock, then somebody could be in process of writing the buffer,
      * leading to risk of bad data written to disk.)
      */
     void
     MarkBufferDirty(Buffer buffer)
     {
         BufferDesc *bufHdr;
         uint32      buf_state;
         uint32      old_buf_state;
     
         if (!BufferIsValid(buffer))
             elog(ERROR, "bad buffer ID: %d", buffer);
     
         if (BufferIsLocal(buffer))
         {
             MarkLocalBufferDirty(buffer);
             return;
         }
     
         bufHdr = GetBufferDescriptor(buffer - 1);
     
         Assert(BufferIsPinned(buffer));
         Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
                                     LW_EXCLUSIVE));
     
         old_buf_state = pg_atomic_read_u32(&bufHdr->state);
         for (;;)
         {
             if (old_buf_state & BM_LOCKED)
                 old_buf_state = WaitBufHdrUnlocked(bufHdr);
     
             buf_state = old_buf_state;
     
             Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
             buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
     
             if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
                                                buf_state))
                 break;
         }
     
         /*
          * If the buffer was not dirty already, do vacuum accounting.
          */
         if (!(old_buf_state & BM_DIRTY))
         {
             VacuumPageDirty++;
             pgBufferUsage.shared_blks_dirtied++;
             if (VacuumCostActive)
                 VacuumCostBalance += VacuumCostPageDirty;
         }
     }
    

    9、RelationNeedsWAL

    非临时表,需持久化的数据表
     /*
      * RelationNeedsWAL
      *      True if relation needs WAL.
      */
     #define RelationNeedsWAL(relation) \
         ((relation)->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)
    

    10、RelationIsAccessibleInLogicalDecoding

     /*
      * RelationIsAccessibleInLogicalDecoding
      *      True if we need to log enough information to have access via
      *      decoding snapshot.
      */
     #define RelationIsAccessibleInLogicalDecoding(relation) \
         (XLogLogicalInfoActive() && \ //处于逻辑复制活动状态
          RelationNeedsWAL(relation) && \ //需要写WAL日志
          (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))//Catalog类型表
    

    11、log_heap_new_cid

     /*
      * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record
      *
      * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
      * tuples.
      */
     static XLogRecPtr
     log_heap_new_cid(Relation relation, HeapTuple tup)
     {
         xl_heap_new_cid xlrec;
     
         XLogRecPtr  recptr;
         HeapTupleHeader hdr = tup->t_data;
     
         Assert(ItemPointerIsValid(&tup->t_self));
         Assert(tup->t_tableOid != InvalidOid);
     
         xlrec.top_xid = GetTopTransactionId();
         xlrec.target_node = relation->rd_node;
         xlrec.target_tid = tup->t_self;
     
         /*
          * If the tuple got inserted & deleted in the same TX we definitely have a
          * combocid, set cmin and cmax.
          */
         if (hdr->t_infomask & HEAP_COMBOCID)
         {
             Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID));
             Assert(!HeapTupleHeaderXminInvalid(hdr));
             xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
             xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
             xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr);
         }
         /* No combocid, so only cmin or cmax can be set by this TX */
         else
         {
             /*
              * Tuple inserted.
              *
              * We need to check for LOCK ONLY because multixacts might be
              * transferred to the new tuple in case of FOR KEY SHARE updates in
              * which case there will be an xmax, although the tuple just got
              * inserted.
              */
             if (hdr->t_infomask & HEAP_XMAX_INVALID ||
                 HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask))
             {
                 xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
                 xlrec.cmax = InvalidCommandId;
             }
             /* Tuple from a different tx updated or deleted. */
             else
             {
                 xlrec.cmin = InvalidCommandId;
                 xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
     
             }
             xlrec.combocid = InvalidCommandId;
         }
     
         /*
          * Note that we don't need to register the buffer here, because this
          * operation does not modify the page. The insert/update/delete that
          * called us certainly did, but that's WAL-logged separately.
          */
         XLogBeginInsert();
         XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
     
         /* will be looked at irrespective of origin */
     
         recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
     
         return recptr;
     }
    

    12、RelationIsLogicallyLogged
    判断数据表是否正在可用于逻辑复制,如需要,则需要记录足够信息用于后续的日志解析

     /*
      * RelationIsLogicallyLogged
      *      True if we need to log enough information to extract the data from the
      *      WAL stream.
      *
      * We don't log information for unlogged tables (since they don't WAL log
      * anyway) and for system tables (their content is hard to make sense of, and
      * it would complicate decoding slightly for little gain). Note that we *do*
      * log information for user defined catalog tables since they presumably are
      * interesting to the user...
      */
     #define RelationIsLogicallyLogged(relation) \
         (XLogLogicalInfoActive() && \
          RelationNeedsWAL(relation) && \
          !IsCatalogRelation(relation))
     
    

    13、XLog*

    XLogBeginInsert
    XLogRegisterData
    XLogRegisterBuffer
    XLogRegisterBufData
    XLogSetRecordFlags
    XLogInsert
    

    14、PageSetLSN
    设置PageHeader的LSN(先前已解析)

     #define PageSetLSN(page, lsn) \
      PageXLogRecPtrSet(((PageHeader) (page))->pd_lsn, lsn)
    

    15、END_CRIT_SECTION

     #define END_CRIT_SECTION() \
     do { \
         Assert(CritSectionCount > 0); \
         CritSectionCount--; \
     } while(0)
    

    16、UnlockReleaseBuffer
    释放Buffer锁

     /*
      * UnlockReleaseBuffer -- release the content lock and pin on a buffer
      *
      * This is just a shorthand for a common combination.
      */
     void
     UnlockReleaseBuffer(Buffer buffer)
     {
         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
         ReleaseBuffer(buffer);
     }
    

    17、ReleaseBuffer
    Unpin Buffer,意味着Buffer可Flush用于其他地方

     /*
      * ReleaseBuffer -- release the pin on a buffer
      */
     void
     ReleaseBuffer(Buffer buffer)
     {
         if (!BufferIsValid(buffer))
             elog(ERROR, "bad buffer ID: %d", buffer);
     
         if (BufferIsLocal(buffer))
         {
             ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
              Assert(LocalRefCount[-buffer - 1] > 0);
             LocalRefCount[-buffer - 1]--;
             return;
         }
          UnpinBuffer(GetBufferDescriptor(buffer - 1), true);
     }
    

    18、CacheInvalidateHeapTuple
    缓存那些已“无用”的Tuple,比如Update操作的原记录,Delete操作的原记录等。

     /*
      * CacheInvalidateHeapTuple
      *      Register the given tuple for invalidation at end of command
      *      (ie, current command is creating or outdating this tuple).
      *      Also, detect whether a relcache invalidation is implied.
      *
      * For an insert or delete, tuple is the target tuple and newtuple is NULL.
      * For an update, we are called just once, with tuple being the old tuple
      * version and newtuple the new version.  This allows avoidance of duplicate
      * effort during an update.
      */
     void
     CacheInvalidateHeapTuple(Relation relation,
                              HeapTuple tuple,
                              HeapTuple newtuple)
     {
         Oid         tupleRelId;
         Oid         databaseId;
         Oid         relationId;
     
         /* Do nothing during bootstrap */
         if (IsBootstrapProcessingMode())
             return;
     
         /*
          * We only need to worry about invalidation for tuples that are in system
          * catalogs; user-relation tuples are never in catcaches and can't affect
          * the relcache either.
          */
         if (!IsCatalogRelation(relation))
             return;
     
         /*
          * IsCatalogRelation() will return true for TOAST tables of system
          * catalogs, but we don't care about those, either.
          */
         if (IsToastRelation(relation))
             return;
     
         /*
          * If we're not prepared to queue invalidation messages for this
          * subtransaction level, get ready now.
          */
         PrepareInvalidationState();
     
         /*
          * First let the catcache do its thing
          */
         tupleRelId = RelationGetRelid(relation);
         if (RelationInvalidatesSnapshotsOnly(tupleRelId))
         {
             databaseId = IsSharedRelation(tupleRelId) ? InvalidOid : MyDatabaseId;
             RegisterSnapshotInvalidation(databaseId, tupleRelId);
         }
         else
             PrepareToInvalidateCacheTuple(relation, tuple, newtuple,
                                           RegisterCatcacheInvalidation);
     
         /*
          * Now, is this tuple one of the primary definers of a relcache entry? See
          * comments in file header for deeper explanation.
          *
          * Note we ignore newtuple here; we assume an update cannot move a tuple
          * from being part of one relcache entry to being part of another.
          */
         if (tupleRelId == RelationRelationId)
         {
             Form_pg_class classtup = (Form_pg_class) GETSTRUCT(tuple);
     
             relationId = HeapTupleGetOid(tuple);
             if (classtup->relisshared)
                 databaseId = InvalidOid;
             else
                 databaseId = MyDatabaseId;
         }
         else if (tupleRelId == AttributeRelationId)
         {
             Form_pg_attribute atttup = (Form_pg_attribute) GETSTRUCT(tuple);
     
             relationId = atttup->attrelid;
     
             /*
              * KLUGE ALERT: we always send the relcache event with MyDatabaseId,
              * even if the rel in question is shared (which we can't easily tell).
              * This essentially means that only backends in this same database
              * will react to the relcache flush request.  This is in fact
              * appropriate, since only those backends could see our pg_attribute
              * change anyway.  It looks a bit ugly though.  (In practice, shared
              * relations can't have schema changes after bootstrap, so we should
              * never come here for a shared rel anyway.)
              */
             databaseId = MyDatabaseId;
         }
         else if (tupleRelId == IndexRelationId)
         {
             Form_pg_index indextup = (Form_pg_index) GETSTRUCT(tuple);
     
             /*
              * When a pg_index row is updated, we should send out a relcache inval
              * for the index relation.  As above, we don't know the shared status
              * of the index, but in practice it doesn't matter since indexes of
              * shared catalogs can't have such updates.
              */
             relationId = indextup->indexrelid;
             databaseId = MyDatabaseId;
         }
         else
             return;
     
         /*
          * Yes.  We need to register a relcache invalidation event.
          */
         RegisterRelcacheInvalidation(databaseId, relationId);
     }
    

    19、heap_freetuple
    释放内存

     /*
      * heap_freetuple
      */
     void
     heap_freetuple(HeapTuple htup)
     {
         pfree(htup);
     }
    

    二、源码解读

    heap_insert函数本身不复杂,最为复杂的子函数RelationGetBufferForTuple已在上一小节解析

    /*
    输入:
        relation-数据表结构体
        tup-Heap Tuple数据(包括头部数据等),亦即数据行
        cid-命令ID(顺序)
        options-选项
        bistate-BulkInsert状态
    输出:
        Oid-数据表Oid
    */
    Oid
    heap_insert(Relation relation, HeapTuple tup, CommandId cid,
                int options, BulkInsertState bistate)
    {
        TransactionId xid = GetCurrentTransactionId();//事务id
        HeapTuple   heaptup;//Heap Tuple数据,亦即数据行
        Buffer      buffer;//数据缓存块
        Buffer      vmbuffer = InvalidBuffer;//vm缓冲块
        bool        all_visible_cleared = false;//标记
    
        /*
         * Fill in tuple header fields, assign an OID, and toast the tuple if
         * necessary.
         *
         * Note: below this point, heaptup is the data we actually intend to store
         * into the relation; tup is the caller's original untoasted data.
         */
        //插入前准备工作,比如设置t_infomask标记等
        heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
    
        /*
         * Find buffer to insert this tuple into.  If the page is all visible,
         * this will also pin the requisite visibility map page.
         */
        //获取相应的buffer,详见上面的子函数解析
        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                           InvalidBuffer, options, bistate,
                                           &vmbuffer, NULL);
    
        /*
         * We're about to do the actual insert -- but check for conflict first, to
         * avoid possibly having to roll back work we've just done.
         *
         * This is safe without a recheck as long as there is no possibility of
         * another process scanning the page between this check and the insert
         * being visible to the scan (i.e., an exclusive buffer content lock is
         * continuously held from this point until the tuple insert is visible).
         *
         * For a heap insert, we only need to check for table-level SSI locks. Our
         * new tuple can't possibly conflict with existing tuple locks, and heap
         * page locks are only consolidated versions of tuple locks; they do not
         * lock "gaps" as index page locks do.  So we don't need to specify a
         * buffer when making the call, which makes for a faster check.
         */
        //检查序列化是否冲突
        CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
    
        /* NO EREPORT(ERROR) from here till changes are logged */
        //开始,变量+1
        START_CRIT_SECTION();
        //插入数据(详见上一节对该函数的解析)
        RelationPutHeapTuple(relation, buffer, heaptup,
                             (options & HEAP_INSERT_SPECULATIVE) != 0);
        //如Page is All Visible
        if (PageIsAllVisible(BufferGetPage(buffer)))
        {
            //复位
            all_visible_cleared = true;
            PageClearAllVisible(BufferGetPage(buffer));
            visibilitymap_clear(relation,
                                ItemPointerGetBlockNumber(&(heaptup->t_self)),
                                vmbuffer, VISIBILITYMAP_VALID_BITS);
        }
    
        /*
         * XXX Should we set PageSetPrunable on this page ?
         *
         * The inserting transaction may eventually abort thus making this tuple
         * DEAD and hence available for pruning. Though we don't want to optimize
         * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the
         * aborted tuple will never be pruned until next vacuum is triggered.
         *
         * If you do add PageSetPrunable here, add it in heap_xlog_insert too.
         */
        //设置缓冲块为脏块
        MarkBufferDirty(buffer);
    
        /* XLOG stuff */
        //记录日志
        if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
        {
            xl_heap_insert xlrec;
            xl_heap_header xlhdr;
            XLogRecPtr  recptr;
            Page        page = BufferGetPage(buffer);
            uint8       info = XLOG_HEAP_INSERT;
            int         bufflags = 0;
    
            /*
             * If this is a catalog, we need to transmit combocids to properly
             * decode, so log that as well.
             */
            if (RelationIsAccessibleInLogicalDecoding(relation))
                log_heap_new_cid(relation, heaptup);
    
            /*
             * If this is the single and first tuple on page, we can reinit the
             * page instead of restoring the whole thing.  Set flag, and hide
             * buffer references from XLogInsert.
             */
            if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
                PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
            {
                info |= XLOG_HEAP_INIT_PAGE;
                bufflags |= REGBUF_WILL_INIT;
            }
    
            xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
            xlrec.flags = 0;
            if (all_visible_cleared)
                xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
            if (options & HEAP_INSERT_SPECULATIVE)
                xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
            Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
    
            /*
             * For logical decoding, we need the tuple even if we're doing a full
             * page write, so make sure it's included even if we take a full-page
             * image. (XXX We could alternatively store a pointer into the FPW).
             */
            if (RelationIsLogicallyLogged(relation))
            {
                xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
                bufflags |= REGBUF_KEEP_DATA;
            }
    
            XLogBeginInsert();
            XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
    
            xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
            xlhdr.t_infomask = heaptup->t_data->t_infomask;
            xlhdr.t_hoff = heaptup->t_data->t_hoff;
    
            /*
             * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
             * write the whole page to the xlog, we don't need to store
             * xl_heap_header in the xlog.
             */
            XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
            XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
            /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
            XLogRegisterBufData(0,
                                (char *) heaptup->t_data + SizeofHeapTupleHeader,
                                heaptup->t_len - SizeofHeapTupleHeader);
    
            /* filtering by origin on a row level is much more efficient */
            XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
    
            recptr = XLogInsert(RM_HEAP_ID, info);
    
            PageSetLSN(page, recptr);
        }
        //完成!
        END_CRIT_SECTION();
        //解锁Buffer,包括vm buffer
        UnlockReleaseBuffer(buffer);
        if (vmbuffer != InvalidBuffer)
            ReleaseBuffer(vmbuffer);
    
        /*
         * If tuple is cachable, mark it for invalidation from the caches in case
         * we abort.  Note it is OK to do this after releasing the buffer, because
         * the heaptup data structure is all in local memory, not in the shared
         * buffer.
         */
        //缓存操作后变“无效”的Tuple
        CacheInvalidateHeapTuple(relation, heaptup, NULL);
    
        /* Note: speculative insertions are counted too, even if aborted later */
        //更新统计信息
        pgstat_count_heap_insert(relation, 1);
    
        /*
         * If heaptup is a private copy, release it.  Don't forget to copy t_self
         * back to the caller's image, too.
         */
        if (heaptup != tup)
        {
            tup->t_self = heaptup->t_self;
            heap_freetuple(heaptup);
        }
    
        return HeapTupleGetOid(tup);
    }
    
    

    三、跟踪分析

    插入一条记录,使用gdb进行跟踪分析:

    -- 这次启动事务
    testdb=# begin;
    BEGIN
    testdb=# select pg_backend_pid();
     pg_backend_pid 
    ----------------
               1556
    (1 row)
    testdb=# insert into t_insert values(11,'11','11','11');
    (挂起)
    
    

    启动gdb:

    [root@localhost ~]# gdb -p 1556
    GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
    Copyright (C) 2013 Free Software Foundation, Inc.
    ...
    (gdb) b heap_insert
    Breakpoint 1 at 0x4c343c: file heapam.c, line 2444.
    #输入参数:
    (gdb) p *relation
    $1 = {rd_node = {spcNode = 1663, dbNode = 16477, relNode = 26731}, rd_smgr = 0x0, rd_refcnt = 1, rd_backend = -1, rd_islocaltemp = false, rd_isnailed = false, rd_isvalid = true, 
      rd_indexvalid = 0 '\000', rd_statvalid = false, rd_createSubid = 0, rd_newRelfilenodeSubid = 0, rd_rel = 0x7f5fdd1771f0, rd_att = 0x7f5fdd177300, rd_id = 26731, rd_lockInfo = {lockRelId = {
          relId = 26731, dbId = 16477}}, rd_rules = 0x0, rd_rulescxt = 0x0, trigdesc = 0x0, rd_rsdesc = 0x0, rd_fkeylist = 0x0, rd_fkeyvalid = false, rd_partkeycxt = 0x0, rd_partkey = 0x0, rd_pdcxt = 0x0, 
      rd_partdesc = 0x0, rd_partcheck = 0x0, rd_indexlist = 0x0, rd_oidindex = 0, rd_pkindex = 0, rd_replidindex = 0, rd_statlist = 0x0, rd_indexattr = 0x0, rd_projindexattr = 0x0, rd_keyattr = 0x0, 
      rd_pkattr = 0x0, rd_idattr = 0x0, rd_projidx = 0x0, rd_pubactions = 0x0, rd_options = 0x0, rd_index = 0x0, rd_indextuple = 0x0, rd_amhandler = 0, rd_indexcxt = 0x0, rd_amroutine = 0x0, 
      rd_opfamily = 0x0, rd_opcintype = 0x0, rd_support = 0x0, rd_supportinfo = 0x0, rd_indoption = 0x0, rd_indexprs = 0x0, rd_indpred = 0x0, rd_exclops = 0x0, rd_exclprocs = 0x0, rd_exclstrats = 0x0, 
      rd_amcache = 0x0, rd_indcollation = 0x0, rd_fdwroutine = 0x0, rd_toastoid = 0, pgstat_info = 0x146f9b8}
    (gdb) p *tup
    $2 = {t_len = 61, t_self = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_tableOid = 26731, t_data = 0x14b19f8}
    (gdb) p *(tup->t_data)
    $3 = {t_choice = {t_heap = {t_xmin = 244, t_xmax = 4294967295, t_field3 = {t_cid = 2249, t_xvac = 2249}}, t_datum = {datum_len_ = 244, datum_typmod = -1, datum_typeid = 2249}}, t_ctid = {ip_blkid = {
          bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_infomask2 = 4, t_infomask = 2, t_hoff = 24 '\030', t_bits = 0x14b1a0f ""}
    (gdb) p *(tup->t_data->t_bits)
    $4 = 0 '\000'
    (gdb) p cid
    $5 = 0
    (gdb) p options
    $6 = 0
    (gdb) p bistate
    $7 = (BulkInsertState) 0x0
    (gdb) next
    2447        Buffer      vmbuffer = InvalidBuffer;
    (gdb) p xid
    $8 = 1612859
    (gdb) next
    2448        bool        all_visible_cleared = false;
    (gdb) 
    2457        heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
    (gdb) 
    2463        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
    (gdb) p *heaptup
    $9 = {t_len = 61, t_self = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_tableOid = 26731, t_data = 0x14b19f8}
    (gdb) next
    2482        CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
    (gdb) p buffer
    $10 = 185
    (gdb) next
    2485        START_CRIT_SECTION();
    (gdb) 
    2488                             (options & HEAP_INSERT_SPECULATIVE) != 0);
    (gdb) 
    2487        RelationPutHeapTuple(relation, buffer, heaptup,
    (gdb) 
    2490        if (PageIsAllVisible(BufferGetPage(buffer)))
    (gdb) 
    2510        MarkBufferDirty(buffer);
    (gdb) p buffer
    $11 = 185
    (gdb) next
    2513        if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
    (gdb) 
    2518            Page        page = BufferGetPage(buffer);
    (gdb) 
    2519            uint8       info = XLOG_HEAP_INSERT;
    (gdb) p *page
    $12 = 1 '\001'
    (gdb) p *(PageHeader)page
    $13 = {pd_lsn = {xlogid = 1, xrecoff = 3677481952}, pd_checksum = 0, pd_flags = 0, pd_lower = 64, pd_upper = 7552, pd_special = 8192, pd_pagesize_version = 8196, pd_prune_xid = 0, 
      pd_linp = 0x7f5fc5409318}
    (gdb) next
    2520            int         bufflags = 0;
    (gdb) 
    2526            if (RelationIsAccessibleInLogicalDecoding(relation))
    (gdb) 
    2534            if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
    (gdb) 
    2541            xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
    (gdb) 
    2542            xlrec.flags = 0;
    (gdb) 
    2543            if (all_visible_cleared)
    (gdb) 
    2545            if (options & HEAP_INSERT_SPECULATIVE)
    (gdb) 
    2554            if (RelationIsLogicallyLogged(relation))
    (gdb) 
    2560            XLogBeginInsert();
    (gdb) 
    2561            XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
    (gdb) p xlrec
    $14 = {offnum = 10, flags = 0 '\000'}
    (gdb) next
    2563            xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
    (gdb) 
    2564            xlhdr.t_infomask = heaptup->t_data->t_infomask;
    (gdb) 
    2565            xlhdr.t_hoff = heaptup->t_data->t_hoff;
    (gdb) 
    2572            XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
    (gdb) 
    2573            XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
    (gdb) 
    2577                                heaptup->t_len - SizeofHeapTupleHeader);
    (gdb) 
    2575            XLogRegisterBufData(0,
    (gdb) 
    2576                                (char *) heaptup->t_data + SizeofHeapTupleHeader,
    (gdb) 
    2575            XLogRegisterBufData(0,
    (gdb) 
    2580            XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
    (gdb) 
    2582            recptr = XLogInsert(RM_HEAP_ID, info);
    (gdb) 
    2584            PageSetLSN(page, recptr);
    (gdb) 
    2587        END_CRIT_SECTION();
    (gdb) 
    2589        UnlockReleaseBuffer(buffer);
    (gdb) 
    2590        if (vmbuffer != InvalidBuffer)
    (gdb) 
    2599        CacheInvalidateHeapTuple(relation, heaptup, NULL);
    (gdb) 
    2602        pgstat_count_heap_insert(relation, 1);
    (gdb) 
    2608        if (heaptup != tup)
    (gdb) 
    2614        return HeapTupleGetOid(tup);
    (gdb) p *tup
    $15 = {t_len = 61, t_self = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 10}, t_tableOid = 26731, t_data = 0x14b19f8}
    (gdb) p *(tup->t_data)
    $16 = {t_choice = {t_heap = {t_xmin = 1612859, t_xmax = 0, t_field3 = {t_cid = 0, t_xvac = 0}}, t_datum = {datum_len_ = 1612859, datum_typmod = 0, datum_typeid = 0}}, t_ctid = {ip_blkid = {
          bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_infomask2 = 4, t_infomask = 2050, t_hoff = 24 '\030', t_bits = 0x14b1a0f ""}
    (gdb)
    (gdb) n
    2615    }
    (gdb) n
    #done!
    ExecInsert (mtstate=0x14b0c10, slot=0x14b1250, planSlot=0x14b1250, estate=0x14b08c0, canSetTag=true) at nodeModifyTable.c:534
    534             if (resultRelInfo->ri_NumIndices > 0) 
    

    四、小结

    1、简单的反面是复杂:插入一行数据,涉及缓冲区管理(在PG中还需要考虑死锁)、日志处理等一系列的细节,原理/理论是简单的,但要在工程上实现得漂亮,不容易!程序猿们,加油吧!
    2、NoSQL是“简单”的,RDBMS是“复杂”的:NoSQL不需要考虑事务,简化了日志处理,实现逻辑相对简单;RDBMS需要考虑A/B/C/D...,权衡了各种利弊,值得深入学习。

    相关文章

      网友评论

        本文标题:PostgreSQL 源码解读(4)- 插入数据#3(heap_

        本文链接:https://www.haomeiwen.com/subject/tfxrvftx.html