美文网首页
【Go】zookeeper分布式锁实现

【Go】zookeeper分布式锁实现

作者: 胖三斤66 | 来源:发表于2020-06-11 11:07 被阅读0次

    分布式锁的基本概念

    :为了实现“同一时间,只能有一个实例对共享资源进行访问”

    分布式锁:当多个分布在不同的机器上的进程竞争共享资源时,就无法使用单机锁实现同步。此时就需要使用分布式锁,而分布式锁通常需要保存(或者说依赖于)第三方,常使用的第三方有 mysql,redis 和 zookeeper。

    这里重点整理基于 zookeeper 的分布式锁。基于 zookeeper 的实现也有两种形式
    1) 临时节点
    ● 获取锁:多个实例去竞争创建同一个临时节点,因为同一个临时节点只可能存在一个,因此只会有一个实例创建成功,而创建成功的实例视为获取锁成功;其他的实例则监听该节点的变化
    ● 释放锁:利用临时节点「会话中断,节点删除」的特点,实现锁的释放。

    2) 临时顺序节点
    ● 获取锁:让多个实例各自创建一个 临时顺序节点,每次让顺序号最小的节点的实例持有分布式锁。让其他实例监听最后一个序号比它小的节点。
    ● 释放锁:同样,利用临时节点「会话中断,节点删除」的特点,实现锁的释放。

    其中,基于 zookeeper 实现的分布式锁有如下特点:

    • zk本身是强一致性,非常试合作为分布式锁

    • 实现简单,且有现成的监听通知机制,就避免不断轮询锁的状态

    • 临时节点的特性「会话中断,节点删除」,使得锁的释放更简单

    • 第一种实现方法存在一个问题「惊群」,即由于大量实例监听一个节点,当该节点删除时,所有实例都会得到响应,这种现状非常不好。因此存在大量实例竞争锁时,应该采用第二种方案更好 [1]

    实现Demo

    实现依赖的开源包

    demo 功能描述
    ● 启动三个实例竞争分布式锁,
    ● 当获得锁的实例会访问共享资源,访问完后会释放锁,并重新参与竞争

    demo 实现代码如下

    package main
    
    import (
        "fmt"
        "time"
        "github.com/docker/libkv"
        "github.com/docker/leadership"
        "github.com/docker/libkv/store"
        "github.com/docker/libkv/store/zookeeper"
        )
    
    func init() {
        // step1:对使用的接口进行注册,相当于注册驱动
        // 如果不注册一下,步骤3会失败
        zookeeper.Register()
    }
    
    func main(){
        // step2:zookeeper相关信息配置
        // zookeeper的ip:port
        zkHost := []string {"127.0.0.1:2181"}
        // 每个实例将会在这个路径下创建临时顺序节点
        path := "qconf/backup/zk_local"
        
        // step3:Create a store using pkg/store.
        client, err := libkv.NewStore("zk", zkHost, &store.Config{})
        if err != nil {
            panic(err)
        }
    
        // step4:创建一个竞争分布式锁的候选者
        // 参数一:之前创建的storeClient
        // 参数二:竞争分布式锁的路径
        // 参数三:自我标识
        // 参数四:超时时间
        candidate := leadership.NewCandidate(client, path, "underwood", 15*time.Second)
        
        // step5:执行竞争分布式锁
        // 返回的第一个通道:竞争结果
        // 返回的第二个通道:竞争时产生的错误
        electedCh, errCh := candidate.RunForElection()
    
        for{
            // step6:阻塞等待结果
            var err error
            select{
            case err = <-errCh: // 竞争过程出现出错
                fmt.Printf("err=%v\n", err)
                break
            case isElected := <- electedCh: // 竞争结果出来了
                if isElected{ // 竞争成功,即获得锁。
                    fmt.Println("i am the leader")
                    time.Sleep(20*time.Second) // 模拟访问共享数据
                    fmt.Println("i give up leadership")
                    candidate.Resign() // 释放锁,然后重新参加竞争
                }else{ // 竞争失败,即未获得锁。有一种特殊情况,任何实例竞争前都会将自己声明为follower
                    fmt.Println("i am the follower")
                }
            }
    
            if err != nil{
                break
            }
        }
    }
    

    测试结果

    • 第一个启动的实例 第一个启动的实例的结果
    • 第二个启动的实例 第二个启动的实例的结果
    • 第三个启动的实例 第三个启动的实例的结果

    PS:截取了部分结果,未截取部分才是截取部分的循环

    底层源码分析

    底层实现原理:基于 zookeeper 的临时顺序节点,实现分布式锁。
    ● 顺序号最小的节点所连接的实例,获得锁
    ● 未获得锁的实例将监听“顺序号比它小一号的节点的状态”

    竞选逻辑:candidate.RunForElection()

    竞选逻辑

    上面代码最关键的在于 “lock.Lock()”。由于本次 demo NewStore() 使用的是 zookeeper,因此 lock 对象是 zookeeperLock 实例对象。所以,接下来要看 “zookeeperLock.Lock()”实现。主要的步骤包括(具体实现见下方代码注释)

    1. 创建临时顺序节点
    2. 找出所有临时顺序节点的最小顺序号以及当前实例所连接的临时顺序节点的前一个顺序号
    3. 如果当前实例的顺序号是最小,就获得锁;反之,阻塞等待
    4. 当阻塞结束,重新判断一遍,以防止前一节点只是单纯掉线导致节点删除了
    zookeeper获取锁的逻辑

    参考:
    [1] Zookeeper实现分布式锁 - 简书

    相关文章

      网友评论

          本文标题:【Go】zookeeper分布式锁实现

          本文链接:https://www.haomeiwen.com/subject/hzovtktx.html