美文网首页
聊聊machinery的Lock

聊聊machinery的Lock

作者: go4it | 来源:发表于2021-04-03 23:52 被阅读0次

    本文主要研究一下machinery的Lock

    Lock

    type Lock interface {
        //Acquire the lock with retry
        //key: the name of the lock,
        //value: at the nanosecond timestamp that lock needs to be released automatically
        LockWithRetries(key string, value int64) error
    
        //Acquire the lock with once
        //key: the name of the lock,
        //value: at the nanosecond timestamp that lock needs to be released automatically
        Lock(key string, value int64) error
    }
    

    Lock接口定义了LockWithRetries、Lock方法

    redis/Lock

    var (
        ErrRedisLockFailed = errors.New("redis lock: failed to acquire lock")
    )
    
    type Lock struct {
        rclient  redis.UniversalClient
        retries  int
        interval time.Duration
    }
    
    func New(cnf *config.Config, addrs []string, db, retries int) Lock {
        if retries <= 0 {
            return Lock{}
        }
        lock := Lock{retries: retries}
    
        var password string
    
        parts := strings.Split(addrs[0], "@")
        if len(parts) == 2 {
            password = parts[0]
            addrs[0] = parts[1]
        }
    
        ropt := &redis.UniversalOptions{
            Addrs:    addrs,
            DB:       db,
            Password: password,
        }
        if cnf.Redis != nil {
            ropt.MasterName = cnf.Redis.MasterName
        }
    
        lock.rclient = redis.NewUniversalClient(ropt)
    
        return lock
    }
    
    func (r Lock) LockWithRetries(key string, unixTsToExpireNs int64) error {
        for i := 0; i <= r.retries; i++ {
            err := r.Lock(key, unixTsToExpireNs)
            if err == nil {
                //成功拿到锁,返回
                return nil
            }
    
            time.Sleep(r.interval)
        }
        return ErrRedisLockFailed
    }
    
    func (r Lock) Lock(key string, unixTsToExpireNs int64) error {
        now := time.Now().UnixNano()
        expiration := time.Duration(unixTsToExpireNs + 1 - now)
        ctx := r.rclient.Context()
    
        success, err := r.rclient.SetNX(ctx, key, unixTsToExpireNs, expiration).Result()
        if err != nil {
            return err
        }
    
        if !success {
            v, err := r.rclient.Get(ctx, key).Result()
            if err != nil {
                return err
            }
            timeout, err := strconv.Atoi(v)
            if err != nil {
                return err
            }
    
            if timeout != 0 && now > int64(timeout) {
                newTimeout, err := r.rclient.GetSet(ctx, key, unixTsToExpireNs).Result()
                if err != nil {
                    return err
                }
    
                curTimeout, err := strconv.Atoi(newTimeout)
                if err != nil {
                    return err
                }
    
                if now > int64(curTimeout) {
                    // success to acquire lock with get set
                    // set the expiration of redis key
                    r.rclient.Expire(ctx, key, expiration)
                    return nil
                }
    
                return ErrRedisLockFailed
            }
    
            return ErrRedisLockFailed
        }
    
        return nil
    }
    

    Lock定义了rclient、retries、interval属性;New方法根据cnf、addrs、db、retries创建lock;LockWithRetries方法根据retries次数来尝试r.Lock(key, unixTsToExpireNs),都没有成功则返回ErrRedisLockFailed;Lock方法执行r.rclient.SetNX,如果不成功则判断是否过期,过期的话执行执行r.rclient.GetSet,若确实过期了则执行r.rclient.Expire(ctx, key, expiration)更新新的过期时间

    小结

    machinery的Lock接口定义了LockWithRetries、Lock方法;基于redis的实现则通过r.rclient.SetNX、r.rclient.GetSet、r.rclient.Expire实现。

    doc

    相关文章

      网友评论

          本文标题:聊聊machinery的Lock

          本文链接:https://www.haomeiwen.com/subject/cxykkltx.html