美文网首页Amazing Arch程序员
Redis-Shake【二】 Sync功能实现简介

Redis-Shake【二】 Sync功能实现简介

作者: 杨_同_学 | 来源:发表于2019-10-28 19:47 被阅读0次

Redis-Shake【一】简要介绍
Redis-Shake【二】 Sync功能实现简介

上一节简要介绍了一下Redis-Shake的使用,接下来详细说一下Redis-Shake的sync模式。

sync模式的基本原理是让Redis-Shake模拟成一个redis slave,直接对源redis实例执行sync/psync命令,该模式主要包含两部分:全量同步(full)和增量同步(increment)。


Redis-Shake sync模式原理图.png

sync模式的入口函数,遍历所有的AddressList,创建SyncNode对象,使用多线程的方式批量执行dbSyncer的sync方法,开始同步逻辑

// 主要代码在redis-shake/sync.go文件中,入口函数是Main()
func (cmd *CmdSync) Main() {
    type syncNode struct {
        id             int
        source         string
        sourcePassword string
        target         []string
        targetPassword string
    }

    // source redis number
    total := utils.GetTotalLink()
    syncChan := make(chan syncNode, total)
    cmd.dbSyncers = make([]*dbSyncer, total)
        // 遍历SourceAddress ,每一个SourceAddress对应一个syncNode对象,
    for i, source := range conf.Options.SourceAddressList {
        var target []string
        if conf.Options.TargetType == conf.RedisTypeCluster {
            target = conf.Options.TargetAddressList
        } else {
            // round-robin pick
            pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
            target = []string{conf.Options.TargetAddressList[pick]}
        }

        nd := syncNode{
            id:             i,
            source:         source,
            sourcePassword: conf.Options.SourcePasswordRaw,
            target:         target,
            targetPassword: conf.Options.TargetPasswordRaw,
        }
        syncChan <- nd
    }

    var wg sync.WaitGroup
    wg.Add(len(conf.Options.SourceAddressList))
    // SourceRdbParallel用来设置多个数据源时,同时最多可以处理多少个Redis数据源
    for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
        go func() {
            for {
                nd, ok := <-syncChan
                if !ok {
                    break
                }

                ds := NewDbSyncer(nd.id, nd.source, nd.sourcePassword, nd.target, nd.targetPassword,
                    conf.Options.HttpProfile+i)
                cmd.dbSyncers[nd.id] = ds
                log.Infof("routine[%v] starts syncing data from %v to %v with http[%v]",
                    ds.id, ds.source, ds.target, ds.httpProfilePort)
                // run in routine
                go ds.sync()

                // wait full sync done 全量阶段处理完时会close waitFull这个channel
                <-ds.waitFull

                wg.Done()
            }
        }()
    }

    wg.Wait()
    close(syncChan)

    // never quit because increment syncing is still running
    select {}
}

dbSyncer.sync 对源Redis执行Sync/Psync命令,并依次执行全量和增量同步

func (ds *dbSyncer) sync() {
    var sockfile *os.File
    if len(conf.Options.SockFileName) != 0 {
        sockfile = utils.OpenReadWriteFile(conf.Options.SockFileName)
        defer sockfile.Close()
    }
        
    // base.Status用于标示sync所处的阶段,waitfull full incr
    base.Status = "waitfull"
    var input io.ReadCloser
    var nsize int64
    //执行sync/psync命令,获取连接
    if conf.Options.Psync {
        input, nsize = ds.sendPSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
    } else {
        input, nsize = ds.sendSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
    }
    defer input.Close()

    ...

    reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)

    // sync rdb 全量同步阶段
    base.Status = "full"
    ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize, conf.Options.TargetTLSEnable)

    // sync increment 增量同步阶段
    base.Status = "incr"
    close(ds.waitFull)
    ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, conf.Options.TargetTLSEnable)
}

全量同步阶段支持并发写入目标Redis,通过对目标Redis执行restore命令来实现key数据的写入

func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, tlsEnable bool) {
    // pipe是从源Redis接收、解析RDB文件的管道
    pipe := utils.NewRDBLoader(reader, &ds.rbytes, base.RDBPipeSize)
    wait := make(chan struct{})
    // 全量阶段写入目标Redis的操作是可以并发执行的,可以通过Parallel设置并发数
    go func() {
        // 全部写入Redis成功之后通过wait channel
        defer close(wait)
        var wg sync.WaitGroup
        wg.Add(conf.Options.Parallel)
        for i := 0; i < conf.Options.Parallel; i++ {
            go func() {
                defer wg.Done()
                c := utils.OpenRedisConn(target, auth_type, passwd, conf.Options.TargetType == conf.RedisTypeCluster,
                    tlsEnable)
                defer c.Close()
                var lastdb uint32 = 0
                // 获取源redis解析到数据
                for e := range pipe {
                    //执行过滤DB的逻辑,对应配置文件中的FilterDBBlacklist、FilterDBWhitelist
                    if filter.FilterDB(int(e.DB)) {
                        // db filter
                        ds.ignore.Incr()
                    } else {
                        ds.nentry.Incr()

                        ...
                        // 执行过滤Key逻辑,对应FilterKeyBlacklist、FilterKeyWhitelist配置
                        if filter.FilterKey(string(e.Key)) == true {
                            // 1. judge if not pass filter key
                            ds.ignore.Incr()
                            continue
                        } else {
                            slot := int(utils.KeyToSlot(string(e.Key)))
                            if filter.FilterSlot(slot) == true {
                                // 2. judge if not pass filter slot
                                ds.ignore.Incr()
                                continue
                            }
                        }

                        log.Debugf("dbSyncer[%v] start restoring key[%s] with value length[%v]", ds.id, e.Key, len(e.Value))
                        // 对目标Redis执行Restore命令把当前key写入到目标redis
                        utils.RestoreRdbEntry(c, e)
                        log.Debugf("dbSyncer[%v] restore key[%s] ok", ds.id, e.Key)
                    }
                }
            }()
        }

        wg.Wait()
    }()

    var stat *syncerStat
        
    for done := false; !done; {
        select {
        case <-wait:
            done = true
        case <-time.After(time.Second):
        }
        stat = ds.Stat()
        var b bytes.Buffer
        // fmt.Fprintf(&b, "dbSyncer[%v] total=%s - %12d [%3d%%]  entry=%-12d",
        fmt.Fprintf(&b, "dbSyncer[%v] total = %s - %12s [%3d%%]  entry=%-12d",
            ds.id, utils.GetMetric(nsize), utils.GetMetric(stat.rbytes), 100*stat.rbytes/nsize, stat.nentry)
        if stat.ignore != 0 {
            fmt.Fprintf(&b, "  ignore=%-12d", stat.ignore)
        }
        log.Info(b.String())
        metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize))
    }
    log.Infof("dbSyncer[%v] sync rdb done", ds.id)
}

增量写入阶段

func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) {
    readeTimeout := time.Duration(10) * time.Minute
    writeTimeout := time.Duration(10) * time.Minute
    isCluster := conf.Options.TargetType == conf.RedisTypeCluster
// 打开目标Redis的连接
    c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable)
    defer c.Close()

    ds.sendBuf = make(chan cmdDetail, conf.Options.SenderCount)
    ds.delayChannel = make(chan *delayNode, conf.Options.SenderDelayChannelSize)
    var sendId, recvId, sendMarkId atomic2.Int64 // sendMarkId is also used as mark the sendId in sender routine
  // 开启一个协程,定期从Source中获取当前redis-shake模拟的slave的SlaveOffset值
    go func() {
        ...
        srcConn := utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword,
            readeTimeout, writeTimeout, false, conf.Options.SourceTLSEnable)
        ticker := time.NewTicker(10 * time.Second)
        for range ticker.C {
            offset, err := utils.GetFakeSlaveOffset(srcConn)
            if err != nil {
                ...
            } else {
                // 更新sourceOffset metric数据
                if ds.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil {
                    log.Errorf("dbSyncer[%v] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s",
                        ds.id, conf.Options.Id, err.Error())
                }
            }
        }
    }()
    // 开启一个协程,用于处理目标Redis返回的写入成功的命令,主要用于统计successCount、successCountTotal、delay
    go func() {
        var node *delayNode
        for {
            reply, err := c.Receive()

            recvId.Incr()
            id := recvId.Get() // receive id

            // print debug log of receive reply
            log.Debugf("receive reply-id[%v]: [%v], error:[%v]", id, reply, err)

            if conf.Options.Metric == false {
                continue
            }

            if err == nil {
                metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1)
            } else {
                metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1)
                if utils.CheckHandleNetError(err) {
                    log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s",
                        ds.id, conf.Options.Id, err.Error())
                } else {
                    log.Panicf("dbSyncer[%v] Event:ErrorReply\tId:%s\tCommand: [unknown]\tError: %s",
                        ds.id, conf.Options.Id, err.Error())
                }
            }

            if node == nil {
                // non-blocking read from delay channel
                select {
                case node = <-ds.delayChannel:
                default:
                    // it's ok, channel is empty
                }
            }

            if node != nil {
                if node.id == id {
                    metric.GetMetric(ds.id).AddDelay(uint64(time.Now().Sub(node.t).Nanoseconds()) / 1000000) // ms
                    node = nil
                } else if node.id < id {
                    log.Panicf("dbSyncer[%v] receive id invalid: node-id[%v] < receive-id[%v]",
                        ds.id, node.id, id)
                }
            }
        }
    }()
//开启一个携程,用于接收源Redis的增量Redis命令
    go func() {
        var (
            lastdb        int32 = 0
            bypass              = false
            isselect            = false
            scmd          string
            argv, newArgv [][]byte
            err           error
            reject        bool
        )

        decoder := redis.NewDecoder(reader)

        log.Infof("dbSyncer[%v] Event:IncrSyncStart\tId:%s\t", ds.id, conf.Options.Id)

        for {
            ignorecmd := false
            isselect = false
            resp := redis.MustDecodeOpt(decoder)

            if scmd, argv, err = redis.ParseArgs(resp); err != nil {
                log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id)
            } else {
                metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1)

                ...

                if scmd != "ping" {
                    if strings.EqualFold(scmd, "select") {
                        if len(argv) != 1 {
                            log.Panicf("dbSyncer[%v] select command len(args) = %d", ds.id, len(argv))
                        }
                        s := string(argv[0])
                        n, err := strconv.Atoi(s)
                        if err != nil {
                            log.PanicErrorf(err, "dbSyncer[%v] parse db = %s failed", ds.id, s)
                        }
                        bypass = filter.FilterDB(n)
                        isselect = true
                    } else if filter.FilterCommands(scmd) {
                        ignorecmd = true
                    }
                    if bypass || ignorecmd {
                        ds.nbypass.Incr()
                        // ds.SyncStat.BypassCmdCount.Incr()
                        metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
                        log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
                        continue
                    }
                }
                // 过滤出写命令,同时基于FilterKeyWhitelist、FilterKeyBlacklist对key做过滤
                newArgv, reject = filter.HandleFilterKeyWithCommand(scmd, argv)
                if bypass || ignorecmd || reject {
                    ds.nbypass.Incr()
                    metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
                    log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
                    continue
                }
            }

            ...
            // 把过滤、解析之后的数据放入到sendBuf channel中
            ds.sendBuf <- cmdDetail{Cmd: scmd, Args: newArgv}
        }
    }()
// 开启一个协程把sendBuf channel中的命令写入到目标Redis中
    go func() {
        var noFlushCount uint
        var cachedSize uint64

        for item := range ds.sendBuf {
            length := len(item.Cmd)
            data := make([]interface{}, len(item.Args))
            for i := range item.Args {
                data[i] = item.Args[i]
                length += len(item.Args[i])
            }
            // 对目标Redis执行命令
            err := c.Send(item.Cmd, data...)
            ...
        }
    }()
 // 阻塞当前协程。定时打印统计数据
    for lstat := ds.Stat(); ; {
        time.Sleep(time.Second)
        nstat := ds.Stat()
        var b bytes.Buffer
        fmt.Fprintf(&b, "dbSyncer[%v] sync: ", ds.id)
        fmt.Fprintf(&b, " +forwardCommands=%-6d", nstat.forward-lstat.forward)
        fmt.Fprintf(&b, " +filterCommands=%-6d", nstat.nbypass-lstat.nbypass)
        fmt.Fprintf(&b, " +writeBytes=%d", nstat.wbytes-lstat.wbytes)
        log.Info(b.String())
        lstat = nstat
    }
}

相关文章

网友评论

    本文标题:Redis-Shake【二】 Sync功能实现简介

    本文链接:https://www.haomeiwen.com/subject/bzubvctx.html