美文网首页工作生活
etcd分布式锁lease keepalive导致的gorout

etcd分布式锁lease keepalive导致的gorout

作者: 7亮 | 来源:发表于2019-07-05 00:46 被阅读0次

分布式锁实现

demo参考:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_mutex_test.go
基本原理:在etcd事务中查询key的revision是否为0,等于0则创建key和value,表示抢锁成功;不等于0则返回最早创建该key的revision信息,表示抢锁失败。更详细的实现留个TODO。

问题现象

  • pprof显示goroutine数量异常,查看详细调用栈看到有两处goroutine泄漏,数量都在14000多。


    1562121215406.jpg
  • 查看etcd锁的kv数量,和泄漏的goroutine数量一致。

问题分析

  • 分析goroutine阻塞位置的代码(如下),从现象特征、注释和代码上下文初步推测加锁的key没有释放成功,goroutine一直对key的lease做keepalive操作。
// keep the lease alive until client error or cancelled context <- 核心注释
go func() {
    defer close(donec)
    for range keepAlive {
        // eat messages until keep alive channel closes
    }
}()
func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
    select {
    case <-donec:
        return
    case <-l.donec:
        return
    case <-ctx.Done():
    }
    //略
}
  • 和etcd mutex example demo代码比对,缺少unlock操作。
  • 确认unlock能否解决问题。源码如下:
func (m *Mutex) Unlock(ctx context.Context) error {
    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}

unlock直接将key删除,可能会出现其他服务抢到锁,临界代码再次被执行。这样能解决当前问题,但会引入其他问题。

解决方案

unlock

  • unlock操作是将锁删除。如果unlock失败怎么办?锁一直存在,goroutine还会泄漏!
  • 各个应用实例执行状态无法确定,某个实例unlock后会有其他应用实例lock成功的可能,会演变成临界代码的串行化执行。

lock expire

  • 根据lock后的程序执行情况,冗余判断代码执行时间,进而设置锁存活时间,并停止对锁的续租行为,到期后key和value自动消失。
  • 如果lock后执行失败了?执行时间不一定靠谱?抢锁后的操作失败或者执行超时锁消失造成二次上锁,这些问题引发的后果是可承担的,出现概率极低,当前的业务场景下是ok的。
  • 抢锁设置超时时间,避免多服务实例goroutine执行阻塞。

如何停止keepalive?分析源码

  • 源码中有说明注释
  • 从调用栈分析是NewSession的调用,入口示例代码:
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    s1, err := concurrency.NewSession(cli)  //<- 问题入口函数
    if err != nil {
        log.Fatal(err)
    }
    defer s1.Close()
    m1 := concurrency.NewMutex(s1, "/my-lock/")

    // acquire lock for s1
    if err := m1.Lock(context.TODO()); err != nil {
        log.Fatal(err)
    }
    fmt.Println("acquired lock for s1")
  • NewSession实现如下:
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops)
    }
        
    //如果没有租约lease,则申请一个新的,其TTL是可以通过参数WithTTL设置,不设置默认defaultSessionTTL 60s。
    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = v3.LeaseID(resp.ID)
    }

    ctx, cancel := context.WithCancel(ops.ctx) 
    //KeepAlive参数ctx的parent来源于ops.ctx,通过此context可以cancel keepalive,停止keepalive channel。
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive { //<- 阻塞点之一,因为keepalive goroutine不停续租,没有close该channel
            // eat messages until keep alive channel closes
        }
    }()

    return s, nil
}

sessionOption对session做参数赋值操作,类似gRPC的client grpc.DialContext的代码。将结构字段赋值做成可变参数,做到针对性的定制化,并提供了WithTTL WithLease WithContext三种参数选项,分别设置新申请租约的TTL、设置已有租约、设置context。如果不熟悉context请先熟悉context的parent canel机制。

太累了,不写了。看一下goroutine执行图,有留言我再写。

lease keepalive执行图

etcd v3 lease keepalve.jpg
  • goroutine E是专门负责在keepalive失败或者取消后,清理keepalive所持有数据对象资源的。D是专门负责在keepalive失败或者取消后,通知上层goroutine。因此goroutine D和E阻塞。
  • goroutine A在stream通道中周期性发送lease保活心跳,etcd接收到心跳request后对lease renew,然后向goroutine B发送response,返回TTL。如果TTL>0,则更新下一次发送保活心跳的时间和lease过期时间,并发送lease alive的message给到goroutine D,形成keepalive forever的模型,goroutine D和E保持阻塞。如果TTL<=0,则调用keepalive关闭模块,通知goroutine D和E进行结束。
  • goroutine C在周期性计算keepalive是否到期,到期删除释放相关资源。主要职责是防止服务端宕掉、网络问题或其他异常问题导致lease已经消亡而客户端还认为lease存活的假象,采取过期机制。
  • 在goroutine E中增加timeout的context,到时间自动关闭context done channel,进而结束D和E goroutine。
  • 实际代码更加复杂,更多channel之间的通信,暂时不画全貌。

相关文章

  • etcd分布式锁lease keepalive导致的gorout

    分布式锁实现 demo参考:https://github.com/etcd-io/etcd/blob/master...

  • 使用etcd分布式锁做主备切换

    利用etcd事务机制可以实现分布式锁,利用分布式锁可以做主备切换功能。Etcd分布式锁实现方式如下:利用etcd事...

  • etcd分布式锁

    1.实现带租约(lease)的分布式锁,如果分布式租约到期,则自动释放锁。 session.Orphan()解释见...

  • 注册中心

    etcd注册中心分布式一致性系统基于raft一致性协议 etcd使用场景服务注册和发现共享配置分布式锁Leader...

  • ETCD 分布式锁

    概述 在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Sync...

  • etcd lock

    Etcd Lock详解 分布式情况下最终都会面临一个资源抢占的问题,解决问题的方法为抽象一个分布式锁,持有锁则可以...

  • 分布式租约机制

    分布式租约机制 1.什么是租约 租约(lease)在分布式中一般描述如下: Lease 是由授权者授予的在一段时间...

  • 使用docker创建etcd集群

    etcd是一个高可用的键值存储系统,场景主要是1、主要用于共享配置2、服务注册与发现3、分布式锁等etcd是由Co...

  • etcd笔记1

    etcd 是一个分布式键值对存储,设计用来可靠而快速的保存关键数据并提供访问。通过分布式锁,leader选举和写屏...

  • etcdctl查看分布式锁状态

    etcd提供分布式锁能力,经常会出现获取锁超时报错,当然这个在逻辑上是正常,一个进程持有锁在没有释放期间,其他进程...

网友评论

    本文标题:etcd分布式锁lease keepalive导致的gorout

    本文链接:https://www.haomeiwen.com/subject/kvgdhctx.html