分布式锁实现
demo参考:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_mutex_test.go
基本原理:在etcd事务中查询key的revision是否为0,等于0则创建key和value,表示抢锁成功;不等于0则返回最早创建该key的revision信息,表示抢锁失败。更详细的实现留个TODO。
问题现象
-
pprof显示goroutine数量异常,查看详细调用栈看到有两处goroutine泄漏,数量都在14000多。
1562121215406.jpg - 查看etcd锁的kv数量,和泄漏的goroutine数量一致。
问题分析
- 分析goroutine阻塞位置的代码(如下),从现象特征、注释和代码上下文初步推测加锁的key没有释放成功,goroutine一直对key的lease做keepalive操作。
// keep the lease alive until client error or cancelled context <- 核心注释
go func() {
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
}()
func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
select {
case <-donec:
return
case <-l.donec:
return
case <-ctx.Done():
}
//略
}
- 和etcd mutex example demo代码比对,缺少unlock操作。
- 确认unlock能否解决问题。源码如下:
func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
}
unlock直接将key删除,可能会出现其他服务抢到锁,临界代码再次被执行。这样能解决当前问题,但会引入其他问题。
解决方案
unlock
- unlock操作是将锁删除。如果unlock失败怎么办?锁一直存在,goroutine还会泄漏!
- 各个应用实例执行状态无法确定,某个实例unlock后会有其他应用实例lock成功的可能,会演变成临界代码的串行化执行。
lock expire
- 根据lock后的程序执行情况,冗余判断代码执行时间,进而设置锁存活时间,并停止对锁的续租行为,到期后key和value自动消失。
- 如果lock后执行失败了?执行时间不一定靠谱?抢锁后的操作失败或者执行超时锁消失造成二次上锁,这些问题引发的后果是可承担的,出现概率极低,当前的业务场景下是ok的。
- 抢锁设置超时时间,避免多服务实例goroutine执行阻塞。
如何停止keepalive?分析源码
- 源码中有说明注释
- 从调用栈分析是NewSession的调用,入口示例代码:
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
s1, err := concurrency.NewSession(cli) //<- 问题入口函数
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
- NewSession实现如下:
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
for _, opt := range opts {
opt(ops)
}
//如果没有租约lease,则申请一个新的,其TTL是可以通过参数WithTTL设置,不设置默认defaultSessionTTL 60s。
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = v3.LeaseID(resp.ID)
}
ctx, cancel := context.WithCancel(ops.ctx)
//KeepAlive参数ctx的parent来源于ops.ctx,通过此context可以cancel keepalive,停止keepalive channel。
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}
donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
// keep the lease alive until client error or cancelled context
go func() {
defer close(donec)
for range keepAlive { //<- 阻塞点之一,因为keepalive goroutine不停续租,没有close该channel
// eat messages until keep alive channel closes
}
}()
return s, nil
}
sessionOption对session做参数赋值操作,类似gRPC的client grpc.DialContext
的代码。将结构字段赋值做成可变参数,做到针对性的定制化,并提供了WithTTL
WithLease
WithContext
三种参数选项,分别设置新申请租约的TTL、设置已有租约、设置context。如果不熟悉context请先熟悉context的parent canel机制。
太累了,不写了。看一下goroutine执行图,有留言我再写。
lease keepalive执行图
etcd v3 lease keepalve.jpg- goroutine E是专门负责在keepalive失败或者取消后,清理keepalive所持有数据对象资源的。D是专门负责在keepalive失败或者取消后,通知上层goroutine。因此goroutine D和E阻塞。
- goroutine A在stream通道中周期性发送lease保活心跳,etcd接收到心跳request后对lease renew,然后向goroutine B发送response,返回TTL。如果TTL>0,则更新下一次发送保活心跳的时间和lease过期时间,并发送lease alive的message给到goroutine D,形成keepalive forever的模型,goroutine D和E保持阻塞。如果TTL<=0,则调用keepalive关闭模块,通知goroutine D和E进行结束。
- goroutine C在周期性计算keepalive是否到期,到期删除释放相关资源。主要职责是防止服务端宕掉、网络问题或其他异常问题导致lease已经消亡而客户端还认为lease存活的假象,采取过期机制。
- 在goroutine E中增加timeout的context,到时间自动关闭context done channel,进而结束D和E goroutine。
- 实际代码更加复杂,更多channel之间的通信,暂时不画全貌。
网友评论