美文网首页工作生活
etcd分布式锁lease keepalive导致的gorout

etcd分布式锁lease keepalive导致的gorout

作者: 7亮 | 来源:发表于2019-07-05 00:46 被阅读0次

    分布式锁实现

    demo参考:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_mutex_test.go
    基本原理:在etcd事务中查询key的revision是否为0,等于0则创建key和value,表示抢锁成功;不等于0则返回最早创建该key的revision信息,表示抢锁失败。更详细的实现留个TODO。

    问题现象

    • pprof显示goroutine数量异常,查看详细调用栈看到有两处goroutine泄漏,数量都在14000多。


      1562121215406.jpg
    • 查看etcd锁的kv数量,和泄漏的goroutine数量一致。

    问题分析

    • 分析goroutine阻塞位置的代码(如下),从现象特征、注释和代码上下文初步推测加锁的key没有释放成功,goroutine一直对key的lease做keepalive操作。
    // keep the lease alive until client error or cancelled context <- 核心注释
    go func() {
        defer close(donec)
        for range keepAlive {
            // eat messages until keep alive channel closes
        }
    }()
    
    func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
        select {
        case <-donec:
            return
        case <-l.donec:
            return
        case <-ctx.Done():
        }
        //略
    }
    
    • 和etcd mutex example demo代码比对,缺少unlock操作。
    • 确认unlock能否解决问题。源码如下:
    func (m *Mutex) Unlock(ctx context.Context) error {
        client := m.s.Client()
        if _, err := client.Delete(ctx, m.myKey); err != nil {
            return err
        }
        m.myKey = "\x00"
        m.myRev = -1
        return nil
    }
    

    unlock直接将key删除,可能会出现其他服务抢到锁,临界代码再次被执行。这样能解决当前问题,但会引入其他问题。

    解决方案

    unlock

    • unlock操作是将锁删除。如果unlock失败怎么办?锁一直存在,goroutine还会泄漏!
    • 各个应用实例执行状态无法确定,某个实例unlock后会有其他应用实例lock成功的可能,会演变成临界代码的串行化执行。

    lock expire

    • 根据lock后的程序执行情况,冗余判断代码执行时间,进而设置锁存活时间,并停止对锁的续租行为,到期后key和value自动消失。
    • 如果lock后执行失败了?执行时间不一定靠谱?抢锁后的操作失败或者执行超时锁消失造成二次上锁,这些问题引发的后果是可承担的,出现概率极低,当前的业务场景下是ok的。
    • 抢锁设置超时时间,避免多服务实例goroutine执行阻塞。

    如何停止keepalive?分析源码

    • 源码中有说明注释
    • 从调用栈分析是NewSession的调用,入口示例代码:
        cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
        if err != nil {
            log.Fatal(err)
        }
        defer cli.Close()
    
        s1, err := concurrency.NewSession(cli)  //<- 问题入口函数
        if err != nil {
            log.Fatal(err)
        }
        defer s1.Close()
        m1 := concurrency.NewMutex(s1, "/my-lock/")
    
        // acquire lock for s1
        if err := m1.Lock(context.TODO()); err != nil {
            log.Fatal(err)
        }
        fmt.Println("acquired lock for s1")
    
    • NewSession实现如下:
    // NewSession gets the leased session for a client.
    func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
        ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
        for _, opt := range opts {
            opt(ops)
        }
            
        //如果没有租约lease,则申请一个新的,其TTL是可以通过参数WithTTL设置,不设置默认defaultSessionTTL 60s。
        id := ops.leaseID
        if id == v3.NoLease {
            resp, err := client.Grant(ops.ctx, int64(ops.ttl))
            if err != nil {
                return nil, err
            }
            id = v3.LeaseID(resp.ID)
        }
    
        ctx, cancel := context.WithCancel(ops.ctx) 
        //KeepAlive参数ctx的parent来源于ops.ctx,通过此context可以cancel keepalive,停止keepalive channel。
        keepAlive, err := client.KeepAlive(ctx, id)
        if err != nil || keepAlive == nil {
            cancel()
            return nil, err
        }
    
        donec := make(chan struct{})
        s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
    
        // keep the lease alive until client error or cancelled context
        go func() {
            defer close(donec)
            for range keepAlive { //<- 阻塞点之一,因为keepalive goroutine不停续租,没有close该channel
                // eat messages until keep alive channel closes
            }
        }()
    
        return s, nil
    }
    

    sessionOption对session做参数赋值操作,类似gRPC的client grpc.DialContext的代码。将结构字段赋值做成可变参数,做到针对性的定制化,并提供了WithTTL WithLease WithContext三种参数选项,分别设置新申请租约的TTL、设置已有租约、设置context。如果不熟悉context请先熟悉context的parent canel机制。

    太累了,不写了。看一下goroutine执行图,有留言我再写。

    lease keepalive执行图

    etcd v3 lease keepalve.jpg
    • goroutine E是专门负责在keepalive失败或者取消后,清理keepalive所持有数据对象资源的。D是专门负责在keepalive失败或者取消后,通知上层goroutine。因此goroutine D和E阻塞。
    • goroutine A在stream通道中周期性发送lease保活心跳,etcd接收到心跳request后对lease renew,然后向goroutine B发送response,返回TTL。如果TTL>0,则更新下一次发送保活心跳的时间和lease过期时间,并发送lease alive的message给到goroutine D,形成keepalive forever的模型,goroutine D和E保持阻塞。如果TTL<=0,则调用keepalive关闭模块,通知goroutine D和E进行结束。
    • goroutine C在周期性计算keepalive是否到期,到期删除释放相关资源。主要职责是防止服务端宕掉、网络问题或其他异常问题导致lease已经消亡而客户端还认为lease存活的假象,采取过期机制。
    • 在goroutine E中增加timeout的context,到时间自动关闭context done channel,进而结束D和E goroutine。
    • 实际代码更加复杂,更多channel之间的通信,暂时不画全貌。

    相关文章

      网友评论

        本文标题:etcd分布式锁lease keepalive导致的gorout

        本文链接:https://www.haomeiwen.com/subject/kvgdhctx.html