etcd lock

作者: 九日火 | 来源:发表于2021-05-17 20:01 被阅读0次

    Etcd Lock详解

    分布式情况下最终都会面临一个资源抢占的问题,解决问题的方法为抽象一个分布式锁,持有锁则可以操作资源,本文使用etcd实现一个分布式锁

    简介

    在正式使用之前,先简单介绍一下etcd的clientv3,etcd的client和服务器通信在v3版本为grpc,所以我们在使用时实际上就是使用grpc和服务器通信.

    etcd的github地址为: https://github.com/etcd-io/etcd

    在github中源代码中存在两个目录:

    $ tree -L 1
    ├── alarm
    .....
    ├── client
    ├── clientv3
    .....
    
    

    其中 clientv3为我们需要使用的client,本文的主角就是clientv3/concurrency

    下载依赖

    在明确我们使用的包后,我们直接依赖clientv3包到我们的工程中.

    go get github.com/etcd-io/etcd/clientv3
    
    

    如果需要梯子,请参考: 终端中设置代理

    连接到etcd

    在很多环境中我们启动etcd都是通过配置tls方式进行的,所以在连接etcd的时候需要使用tls的方式连接(可是百度上很多文章居然都没写.),具体的连接方式如下:

        tlsInfo := transport.TLSInfo{
            CertFile:      "etcd-v3.3.12-linux-amd64/etcd.pem",
            KeyFile:       "etcd-v3.3.12-linux-amd64/etcd-key.pem",
            TrustedCAFile: "etcd-v3.3.12-linux-amd64/ca.pem",
        }
        tlsConfig, err := tlsInfo.ClientConfig()
        if err != nil {
            log.Fatal(err)
        }
        config := clientv3.Config{
            Endpoints: []string{"127.0.0.1:12379"},#此处为etcd server监听地址
            TLS:       tlsConfig,
        }
        client, e := clientv3.New(config)
        if e != nil {
            log.Fatal(e.Error())
        }
        defer client.Close()
    
    

    使用Mutex获取锁

    在使用证书连接到etcd的server后,我们使用clientv3/concurrency包中的Mutex进行分布式锁

        # 生成一个30s超时的上下文
        timeout, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancelFunc()
        # 获取租约
        response, e := client.Grant(timeout, 30)
        if e != nil {
            log.Fatal(e.Error())
        }
        # 通过租约创建session
        session, e := concurrency.NewSession(client, concurrency.WithLease(response.ID))
        if e != nil {
            log.Fatal(e.Error())
        }
        defer session.Close()
        # 通过session和锁前缀
        mutex := concurrency.NewMutex(session, "/lock")
        e = mutex.Lock(timeout)
        if e != nil {
            log.Fatal(e.Error())
        }
    
        # 业务逻辑
    
        # 释放锁
        defer mutex.Unlock(timeout)
    
    

    整体流程还是相对复杂的,接下来我们将一点一点解析:

    1.生成30s超时的上下文(context)

    锁的竞争是有时间的,不可能一直竞争下去,设定一个30s的超时时间,是让mutex.Lock()的时候有一个获取锁的时间,在此时间内,如果没有获取到锁则应该重试或者提示失败.

    2.client.Grant获取租约

    租约设置了一个和上下文一样的超时时间,保证租约有足够的时间

    3.根据租约创建一个session回话,维护租约的过期时间

    在client中抽象出了一个session对象来持续保持租约不过期,具体源码为:

    //会话表示在客户端的生存期内保持活动的租约。
    //应用程序可能会使用会话来解释活动性。
    type Session struct {
        client *v3.Client
        opts   *sessionOptions
        id     v3.LeaseID
    
        cancel context.CancelFunc
        donec  <-chan struct{}
    }
    
    // NewSession gets the leased session for a client.
    func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
        ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
        for _, opt := range opts {
            opt(ops)
        }
    
        id := ops.leaseID
        if id == v3.NoLease {
            resp, err := client.Grant(ops.ctx, int64(ops.ttl))
            if err != nil {
                return nil, err
            }
            id = v3.LeaseID(resp.ID)
        }
    
        ctx, cancel := context.WithCancel(ops.ctx)
        keepAlive, err := client.KeepAlive(ctx, id)
        if err != nil || keepAlive == nil {
            cancel()
            return nil, err
        }
    
        donec := make(chan struct{})
        s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
        //在客户端错误或取消上下文之前保持租约的活动状态
        // keep the lease alive until client error or cancelled context
        go func() {
            defer close(donec)
            for range keepAlive {
                //在保持活动频道关闭前接收信息
                // eat messages until keep alive channel closes
            }
        }()
    
        return s, nil
    }
    
    

    其实就是在newSession中启动一个协程不断读取keepAlive这个channel的数据

    4.concurrency.NewMutex创建一个锁,调用lock开始竞争锁

    // Lock locks the mutex with a cancelable context. If the context is canceled
    // while trying to acquire the lock, the mutex tries to clean its stale lock entry.
    func (m *Mutex) Lock(ctx context.Context) error {
        s := m.s
        client := m.s.Client()
    
        //生成锁的key
        m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
        //使用事务机制
        //比较key的revision为0(0标示没有key)
        cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
        //则put key,并设置租约
        // put self in lock waiters via myKey; oldest waiter holds lock
        put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
        //否则 获取这个key,重用租约中的锁(这里主要目的是在于重入)
        //通过第二次获取锁,判断锁是否存在来支持重入
        //所以只要租约一致,那么是可以重入的.
        // reuse key in case this session already holds the lock
        get := v3.OpGet(m.myKey)
        //通过前缀获取最先创建的key
        // fetch current holder to complete uncontended path with only one RPC
        getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
        resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
        if err != nil {
            return err
        }
        //获取到自身的revision(注意,此处CreateRevision和Revision不一定相等)
        m.myRev = resp.Header.Revision
        if !resp.Succeeded {
            m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
        }
        // 通过对比自身的revision和最先创建的key的revision得出谁获得了锁
        // 例如 自身revision:5,最先创建的key createRevision:3  那么不获得锁,进入waitDeletes
        //     自身revision:5,最先创建的key createRevision:5  那么获得锁
    
        // if no key on prefix / the minimum rev is key, already hold the lock
        ownerKey := resp.Responses[1].GetResponseRange().Kvs
        if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
            m.hdr = resp.Header
            return nil
        }
        // 等待其他程序释放锁,并删除其他revisions
        // wait for deletion revisions prior to myKey
        hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
        // release lock key if wait failed
        if werr != nil {
            m.Unlock(client.Ctx())
        } else {
            m.hdr = hdr
        }
        return werr
    }
    // 等待持有锁的key删除
    // 内部实现为等其他所有比当前createRevision小的key,监听删除事件
    // waitDeletes efficiently waits until all keys matching the prefix and no greater
    // than the create revision.
    func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
        //WithLastCreate 按照CreateRevision排序,降序 例如 5 4 3 2 1 
        //WithMaxCreateRev 获取比maxCreateRev小的key
        getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
        for {
            resp, err := client.Get(ctx, pfx, getOpts...)
            if err != nil {
                return nil, err
            }
            if len(resp.Kvs) == 0 {
                return resp.Header, nil
            }
            lastKey := string(resp.Kvs[0].Key)
            //监听删除事件
            if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
                return nil, err
            }
        }
    }
    //从revision开始监听删除事件,因为revision存在,所以也避免了ABA问题
    func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
        cctx, cancel := context.WithCancel(ctx)
        defer cancel()
    
        var wr v3.WatchResponse
        wch := client.Watch(cctx, key, v3.WithRev(rev))
        for wr = range wch {
            for _, ev := range wr.Events {
                //遇到删除事件才返回
                if ev.Type == mvccpb.DELETE {
                    return nil
                }
            }
        }
        if err := wr.Err(); err != nil {
            return err
        }
        if err := ctx.Err(); err != nil {
            return err
        }
        return fmt.Errorf("lost watcher waiting for delete")
    }
    
    

    CreateRevision: 创建时的revision

    Header.Revision: etcd server现在的revision

    ABA问题: CAS和ABA问题

    5.释放锁

    这里释放锁,比较简单,就是删除此key

    //释放锁
    func (m *Mutex) Unlock(ctx context.Context) error {
        client := m.s.Client()
        if _, err := client.Delete(ctx, m.myKey); err != nil {
            return err
        }
        m.myKey = "\x00"
        m.myRev = -1
        return nil
    }
    
    

    revision

    刚才说了那么多revision,这里我们需要详细的了解一下etcd中的mvcc多版本机制

    那么我们先来明确几个概念:

    main ID:在etcd中每个事务的唯一id,全局递增不重复.

    sub ID: 在事务中的连续多个修改操作会从0开始编号,这个编号就是sub ID

    revision: 由(mainID,subID)组成的唯一标识

    所以现在我们的revision就是这么来的啦…

    相关文章

      网友评论

          本文标题:etcd lock

          本文链接:https://www.haomeiwen.com/subject/lsgjjltx.html