美文网首页tidb一些收藏
一例TiDB DM同步任务写冲突的分析与解决

一例TiDB DM同步任务写冲突的分析与解决

作者: LittleMagic | 来源:发表于2021-02-08 18:39 被阅读0次

    提出问题

    我们当前通过5个DM任务从RDS MySQL向TiDB同步数据。这些任务均非合库合表任务,且同步的库表相互之间没有交集,safe-mode均未显式打开,Syncer线程数16。且除DM任务外,几乎没有其他写入动作。

    同步开始后,通过Grafana的TiDB/KV Errors面板观察到持续的写冲突,如下图所示。

    同时AlertManager出现大量关于事务重试的报警。

    尝试逐个停止DM任务,发现一旦停止其中流量最大的那一个(接入40+张表,数千QPS),写冲突即消失。

    分析问题

    查看TiDB Server的日志,其中频繁打印prewrite encounters lock,表示预写阶段有锁冲突,并且DM Syncer写入采用的是乐观事务。

    [2021/02/08 15:47:40.112 +08:00] [INFO] [2pc.go:822] ["prewrite encounters lock"] [conn=0] [lock="key: {metaKey=true, key=DB:820, field=TID:864}, primary: {metaKey=true, key=DB:820, field=TID:864}, txnStartTS: 422778099492716553, lockForUpdateTS:0, ttl: 3001, type: Put"]
    [2021/02/08 15:47:40.114 +08:00] [WARN] [txn.go:66] [RunInNewTxn] ["retry txn"=422778099492716555] ["original txn"=422778099492716555] [error="[kv:9007]Write conflict, txnStartTS=422778099492716555, conflictStartTS=422778099492716553, conflictCommitTS=422778099492716557, key={metaKey=true, key=DB:820, field=TID:864} primary={metaKey=true, key=DB:820, field=TID:864} [try again later]"]
    [2021/02/08 15:47:40.116 +08:00] [INFO] [2pc.go:1336] ["2PC clean up done"] [txnStartTS=422778099492716555]
    [2021/02/08 15:47:40.126 +08:00] [WARN] [txn.go:66] [RunInNewTxn] ["retry txn"=422778099492716568] ["original txn"=422778099492716568] [error="[kv:9007]Write conflict, txnStartTS=422778099492716568, conflictStartTS=422778099492716567, conflictCommitTS=422778099492716570, key={metaKey=true, key=DB:820, field=TID:862} primary={metaKey=true, key=DB:820, field=TID:862} [try again later]"]
    [2021/02/08 15:47:40.127 +08:00] [INFO] [2pc.go:1336] ["2PC clean up done"] [txnStartTS=422778099492716568]
    [2021/02/08 15:47:40.305 +08:00] [INFO] [2pc.go:822] ["prewrite encounters lock"] [conn=0] [lock="key: {metaKey=true, key=DB:820, field=TID:862}, primary: {metaKey=true, key=DB:820, field=TID:862}, txnStartTS: 422778099532038211, lockForUpdateTS:0, ttl: 3001, type: Put"]
    [2021/02/08 15:47:40.309 +08:00] [WARN] [txn.go:66] [RunInNewTxn] ["retry txn"=422778099532038213] ["original txn"=422778099532038213] [error="[kv:9007]Write conflict, txnStartTS=422778099532038213, conflictStartTS=422778099532038211, conflictCommitTS=422778099545145351, key={metaKey=true, key=DB:820, field=TID:862} primary={metaKey=true, key=DB:820, field=TID:862} [try again later]"]
    [2021/02/08 15:47:40.311 +08:00] [INFO] [2pc.go:1336] ["2PC clean up done"] [txnStartTS=422778099532038213]
    [2021/02/08 15:47:40.365 +08:00] [INFO] [2pc.go:822] ["prewrite encounters lock"] [conn=0] [lock="key: {metaKey=true, key=DB:820, field=TID:862}, primary: {metaKey=true, key=DB:820, field=TID:862}, txnStartTS: 422778099558252548, lockForUpdateTS:0, ttl: 3001, type: Put"]
    [2021/02/08 15:47:40.367 +08:00] [WARN] [txn.go:66] [RunInNewTxn] ["retry txn"=422778099558252549] ["original txn"=422778099558252549] [error="[kv:9007]Write conflict, txnStartTS=422778099558252549, conflictStartTS=422778099558252548, conflictCommitTS=422778099558252551, key={metaKey=true, key=DB:820, field=TID:862} primary={metaKey=true, key=DB:820, field=TID:862} [try again later]"]
    [2021/02/08 15:47:40.368 +08:00] [INFO] [2pc.go:1336] ["2PC clean up done"] [txnStartTS=422778099558252549]
    [2021/02/08 15:47:41.514 +08:00] [INFO] [2pc.go:822] ["prewrite encounters lock"] [conn=0] [lock="key: {metaKey=true, key=DB:820, field=TID:862}, primary: {metaKey=true, key=DB:820, field=TID:862}, txnStartTS: 422778099859718155, lockForUpdateTS:0, ttl: 3001, type: Put"]
    [2021/02/08 15:47:41.516 +08:00] [WARN] [txn.go:66] [RunInNewTxn] ["retry txn"=422778099859718158] ["original txn"=422778099859718158] [error="[kv:9007]Write conflict, txnStartTS=422778099859718158, conflictStartTS=422778099859718155, conflictCommitTS=422778099859718160, key={metaKey=true, key=DB:820, field=TID:862} primary={metaKey=true, key=DB:820, field=TID:862} [try again later]"]
    [2021/02/08 15:47:41.517 +08:00] [INFO] [2pc.go:1336] ["2PC clean up done"] [txnStartTS=422778099859718158]
    

    但是,TiKV日志中并未发现与写冲突相关的任何信息(几乎都是与not leader相关的)。参考官方文档“乐观事务模型下写写冲突问题排查”一节,同样无法从上述日志中定位出冲突的数据及主键信息(没有tableID、indexID、handle等有效的字段)。

    那么,形如key={metaKey=true, key=DB:820, field=TID:862}的日志是在哪里输出的?既然文档不能解决问题,那么就直接上源码。来到store/tikv/snapshot.go文件,部分代码如下。

    func newWriteConflictError(conflict *pb.WriteConflict) error {
        var buf bytes.Buffer
        prettyWriteKey(&buf, conflict.Key)
        buf.WriteString(" primary=")
        prettyWriteKey(&buf, conflict.Primary)
        return kv.ErrWriteConflict.FastGenByArgs(conflict.StartTs, conflict.ConflictTs, conflict.ConflictCommitTs, buf.String())
    }
    
    func prettyWriteKey(buf *bytes.Buffer, key []byte) {
        tableID, indexID, indexValues, err := tablecodec.DecodeIndexKey(key)
        if err == nil {
            _, err1 := fmt.Fprintf(buf, "{tableID=%d, indexID=%d, indexValues={", tableID, indexID)
            // ...
            return
        }
    
        tableID, handle, err := tablecodec.DecodeRecordKey(key)
        if err == nil {
            _, err3 := fmt.Fprintf(buf, "{tableID=%d, handle=%d}", tableID, handle)
            // ...
            return
        }
    
        mKey, mField, err := tablecodec.DecodeMetaKey(key)
        if err == nil {
            _, err3 := fmt.Fprintf(buf, "{metaKey=true, key=%s, field=%s}", string(mKey), string(mField))
            // ...
            return
        }
        // ...
    }
    

    可见,当产生写冲突时,prettyWriteKey()函数会负责输出冲突的key信息,而带有metaKey=true的自然是表示元数据key有冲突。从tablecodec.DecodeMetaKey()方法中并不能得到关于元数据的太多细节,继续来到源码meta/meta.go文件,其注释恰好描述了元数据的结构。

    Meta structure:
        NextGlobalID -> int64
        SchemaVersion -> int64
        DBs -> {
            DB:1 -> db meta data []byte
            DB:2 -> db meta data []byte
        }
        DB:1 -> {
            Table:1 -> table meta data []byte
            Table:2 -> table meta data []byte
            TID:1 -> int64
            TID:2 -> int64
        }
    

    执行curl [tidb_addr]:10080/db-table/[TID]命令,通过TID(等同于tableID)可以查询到对应的表名及库名。上述TID为862的表是一个写入量较大的业务表,但按照常理也不应出现如此频繁的写冲突,所以问题只可能出现在该表对应的元数据内部。

    继续向下看与元数据相关的字段。

    var (
        mMetaPrefix       = []byte("m")
        mNextGlobalIDKey  = []byte("NextGlobalID")
        mSchemaVersionKey = []byte("SchemaVersionKey")
        mDBs              = []byte("DBs")
        mDBPrefix         = "DB"
        mTablePrefix      = "Table"
        mSequencePrefix   = "SID"
        mSeqCyclePrefix   = "SequenceCycle"
        mTableIDPrefix    = "TID"
        mRandomIDPrefix   = "TARID"
        mBootstrapKey     = []byte("BootstrapKey")
        mSchemaDiffPrefix = "Diff"
    )
    

    通过mTableIDPrefix、mRandomIDPrefix等字段可以推测,表元数据内维护了当前自动生成的ID。继续查看meta/autoid/autoid.go,能够看到自动ID的分配器(即Allocator接口的实现)有如下4种,刚好与上面的元数据定义对得上。

    const (
        // RowIDAllocType indicates the allocator is used to allocate row id.
        RowIDAllocType AllocatorType = iota
        // AutoIncrementType indicates the allocator is used to allocate auto increment value.
        AutoIncrementType
        // AutoRandomType indicates the allocator is used to allocate auto-shard id.
        AutoRandomType
        // SequenceType indicates the allocator is used to allocate sequence value.
        SequenceType
    )
    

    通过自动生成ID的函数generateAutoIDByAllocType()向下追溯可知,TiDB对RowIDAllocType和AutoIncrementType的处理方式相同,也就是说行ID和自增ID都是维护在以TID为前缀的元数据key对应的value中

    func generateAutoIDByAllocType(m *meta.Meta, dbID, tableID, step int64, allocType AllocatorType) (int64, error) {
        switch allocType {
        case RowIDAllocType, AutoIncrementType:
            return m.GenAutoTableID(dbID, tableID, step)
        case AutoRandomType:
            return m.GenAutoRandomID(dbID, tableID, step)
        case SequenceType:
            return m.GenSequenceValue(dbID, tableID, step)
        default:
            return 0, ErrInvalidAllocatorType.GenWithStackByArgs()
        }
    }
    
    // GenAutoTableID adds step to the auto ID of the table and returns the sum.
    func (m *Meta) GenAutoTableID(dbID, tableID, step int64) (int64, error) {
        // Check if DB exists.
        dbKey := m.dbKey(dbID)
        if err := m.checkDBExists(dbKey); err != nil {
            return 0, errors.Trace(err)
        }
        // Check if table exists.
        tableKey := m.tableKey(tableID)
        if err := m.checkTableExists(dbKey, tableKey); err != nil {
            return 0, errors.Trace(err)
        }
        return m.txn.HInc(dbKey, m.autoTableIDKey(tableID), step)
    }
    
    func (m *Meta) autoTableIDKey(tableID int64) []byte {
        return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID))
    }
    

    查看TID为862的表schema,发现其主键定义为bigint(20) NOT NULL AUTO_INCREMENT类型,所以高度怀疑是该表的自增ID引起了写冲突

    由于DM同步任务插入数据是采用INSERT INTO VALUES(...)语法,故来到executor/insert_common.go的insertRows()函数,它负责处理此类SQL语句。

    // insertRows processes `insert|replace into values ()` or `insert|replace into set x=y`
    func insertRows(ctx context.Context, base insertCommon) (err error) {
        e := base.insertCommon()
        // ...
        e.lazyFillAutoID = true
        // ...
        for i, list := range e.Lists {
            e.rowCount++
            var row []types.Datum
            row, err = evalRowFunc(ctx, list, i)
            if err != nil {
                return err
            }
            rows = append(rows, row)
            if batchInsert && e.rowCount%uint64(batchSize) == 0 {
                // ...
                // Before batch insert, fill the batch allocated autoIDs.
                rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
                if err != nil {
                    return err
                }
                // ...
            }
        }
        // ...
    }
    

    根据注释,lazyAdjustAutoIncrementDatum()函数用来填充此批次内的自动ID。注意到它首先会尝试获取插入数据中自动ID列对应的数据,如果非空且非0,就会直接使用该ID,但同时会调用Table.RebaseAutoID()方法来根据当前ID重置自动ID的起点。RebaseAutoID()方法实际调用的是各Allocator的Rebase()方法。

    func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) {
        // ...
        for processedIdx := 0; processedIdx < rowCount; processedIdx++ {
            autoDatum := rows[processedIdx][idx]
    
            var err error
            var recordID int64
            if !autoDatum.IsNull() {
                recordID, err = getAutoRecordID(autoDatum, &col.FieldType, true)
                if err != nil {
                    return nil, err
                }
            }
            // Use the value if it's not null and not 0.
            if recordID != 0 {
                err = e.Table.RebaseAutoID(e.ctx, recordID, true, autoid.RowIDAllocType)
                if err != nil {
                    return nil, err
                }
                e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
                retryInfo.AddAutoIncrementID(recordID)
                continue
            }
            // ...
        }
        // ...
    }
    
    func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error {
        if tableID == 0 {
            return errInvalidTableID.GenWithStack("Invalid tableID")
        }
    
        alloc.mu.Lock()
        defer alloc.mu.Unlock()
    
        if alloc.isUnsigned {
            return alloc.rebase4Unsigned(tableID, uint64(requiredBase), allocIDs)
        }
        return alloc.rebase4Signed(tableID, requiredBase, allocIDs)
    }
    

    不论是有符号还是无符号的rebase,都会调用kv.RunInNewTxn()方法(注意到它出现在了上文TiDB的日志中)来启动一个新事务来尝试调整自动ID的区间。

    func (alloc *allocator) rebase4Unsigned(tableID int64, requiredBase uint64, allocIDs bool) error {
        // ...
        err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
            m := meta.NewMeta(txn)
            currentEnd, err1 := getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType)
            if err1 != nil {
                return err1
            }
            uCurrentEnd := uint64(currentEnd)
            if allocIDs {
                newBase = mathutil.MaxUint64(uCurrentEnd, requiredBase)
                newEnd = mathutil.MinUint64(math.MaxUint64-uint64(alloc.step), newBase) + uint64(alloc.step)
            } else {
                if uCurrentEnd >= requiredBase {
                    newBase = uCurrentEnd
                    newEnd = uCurrentEnd
                    return nil
                }
                newBase = requiredBase
                newEnd = requiredBase
            }
            _, err1 = generateAutoIDByAllocType(m, alloc.dbID, tableID, int64(newEnd-uCurrentEnd), alloc.allocType)
            return err1
        })
        // ...
    }
    

    推源码推到这里,答案已经呼之欲出了。询问业务侧对此表的写入方式,答复是插入数据时显式指定了自增列的值。由于TiDB是采用分段缓存的方式维护自增ID的(详情查看官方文档中对AUTO_INCREMENT的解释),显式插入的自增ID值大概率会导致自动分配的ID区间频繁rebase。再加上我们是采用LB组件下挂3个TiDB Server的方式作为DM的target,多个TiDB实例之间还会争抢自增ID的分段,使写冲突更加严重。

    解决问题

    简单粗暴的方法是要求业务端不要指定ID,但代价比较大,故我们尝试去掉此表主键列的自增属性。设置系统变量:

    SET SESSION tidb_allow_remove_auto_inc = 1;
    

    然后执行ALTER TABLE语句:

    ALTER TABLE warehouse_db_new.warehouse_sku
    MODIFY sku_id bigint(20) NOT NULL COMMENT 'SKU ID';
    

    执行完毕后,写冲突明显下降,大功告成。

    The End

    还有几天就过年了,预祝大佬们春节快乐~

    相关文章

      网友评论

        本文标题:一例TiDB DM同步任务写冲突的分析与解决

        本文链接:https://www.haomeiwen.com/subject/rzmxxltx.html