分布式锁

作者: Jerry_06ed | 来源:发表于2019-05-17 23:32 被阅读24次

    简述

    使用分布式锁的目的:

    • 提高效率:使用分布式锁避免不同节点重复性的操作,比如:推送、定时任务等
    • 保证正确性:避免多个节点破坏操作的正确性,比如多个节点同时更新共享资源的问题。

    分布式锁需要的特性:

    • 互斥性
    • 可重入
    • 锁超时
    • 阻塞、非阻塞
    • 公平、非公平锁

    Mysql方式

    我们可以通过数据库的来实现锁,具体的实现方式如下:

    互斥锁

    首先建立一张表用来存储锁信息,表结构如下:

    CREATE TABLE `tb_lock` (
        `id` bigint(20) NOT NULL COMMENT '主键',
            `key` VARCHAR(100) not null COMMENT '锁持锁对象的标识',
            `resource_id` bigint(20) not null comment '竞争的资源id',
            `count` int(10) not null comment '锁重入次数',
            `insert_time` datetime not null comment '插入时间',
            `update_time` datetime not null comment '更新时间',
             PRIMARY key (`id`),
             UNIQUE(`resource_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 comment '锁';
    

    获取锁代码:

        private boolean acquire(long resourceId) throws Exception {
            String key = String.format("%s-%s",getUUID(),Thread.currentThread().getId());
            List<Lock> locks = daoHelper.sql.getListBySQL(Lock.class,"resource_id="+resourceId);
            if(CollectionUtils.isNotEmpty(locks)){
                Lock lock = locks.get(0);
                if(key.equals(lock.getKey())){
                    lock.setCount(lock.getCount()+1);
                    daoHelper.sql.upateEntity(lock);
                    return true;
                }else{
                    return false;
                }
            }
            return insert(resourceId,key);
        }
    

    释放锁的代码:

        private int release(long resourceId) throws Exception {
            String key = String.format("%s-%s",getUUID(),Thread.currentThread().getId());
            List<Lock> locks = daoHelper.sql.getListBySQL(Lock.class,"resource_id="+resourceId);
            if(CollectionUtils.isNotEmpty(locks)){
                Lock lock = locks.get(0);
                if(key.equals(lock.getKey())){
                    if(lock.getCount()==1){
                        daoHelper.sql.deleteByID(Lock.class,lock.getId());
                    }else{
                        lock.setCount(lock.getCount()-1);
                        daoHelper.sql.upateEntity(lock);
                        return lock.getCount();
                    }
                }else{
                    return 0;
                }
            }
            return 0;
        }
    

    阻塞的实现:

        /**
         * 不断的重试。
         */
        public boolean tryLock(long resourceId,long time) throws Exception {
            while(true){
                long endTime = System.currentTimeMillis() + time;
                if(acquire(resourceId)){
                    return true;
                }
                if(endTime>System.currentTimeMillis()){
                    return false;
                }
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            }
        }
        /**
         * 重试
         */
        public boolean lock(long resourceId) throws Exception {
            while(true){
                if(acquire(resourceId)){
                    return true;
                }
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            }
        }
    

    思路:

    1. 通过key字段标识持有锁的进程和线程。
    2. 通过count字段记录锁重入的次数。
    3. 通过循环加线程挂起的方式实现阻塞。

    小结

    一般我们很少会使用数据库实现分布式,如果数据本身存储在Mysql中且有资源竞争的话,可以采用事物或者乐观锁来解决。

    Zookeeper方案

    ZooKeeper是以Paxos算法为基础的分布式应用程序协调服务。它的数据结构和文件目录类似,它的数据节点类型包括四种类型:

    • 持久化节点
    • 持久化顺序节点
    • 临时节点
    • 临时顺序节点

    互斥锁实现

    我们可以利用它的临时顺序性节点和它的watch机制实现分布式锁,如下图所示:


    image

    上图所示,client1现在持有锁。zookeeper获取锁的整个过程:

    1. 首先选取一个目录(如/root/lock/)用作实现分布式锁的根目录。
    2. 客户端尝试在资源目录下创建临时顺序节点。
    3. 然后,尝试获取竞争的资源目录下的所有子节点。
    4. 判断子节点中序号最小的节点是不是自己所创建的,如果是的话则获取锁成功。
    5. 如果没有获取锁成功,则在/root/lock/目录上添加watcher,监听其子节点是否有新增或者删除。
    6. 如果字节点发生改变,重复3步骤。

    zookeeper也可以理解为一个小型的数据库,它的每个节点都可以存储数据,默认存储数据大小为1M。所以我们可以在每个节点上存竞争锁的线程标识,重入次数等信息,用来实现锁重入等功能。

    上述过程中有一个问题:所有客户端都把watcher注册到资源节点上,一旦资源节点的子节点发生改变,会通知所有的客户端,但实际上能够成功获取锁的客户端只有一个,就是序号最小的那个节点。这样会造成网络资源的浪费,严重时可能会打满网卡,这种现象在zookeeper使用中称作“羊群效应”。

    为了解决上述问题,采用最小监听的模式,每个客户端只对排在它前面的节点注入watcher,改进后如下图所示:


    image

    读写锁实现

    读写锁的实现逻辑跟互斥锁差不多,针对读写节点我们可以在其节点名字前增加前缀进行区分。如下图所示:


    image
    1. client1和client2获取了读锁。
    2. client3需要获取写锁,所以对client2进行了监听;client4也需要获取写锁,但是它的前面有一个节点也需要获取写锁,所以它监听client3节点
    3. client5节点是为了获取读锁,虽然锁目前被读锁占领,但是它的请求比client4和client5晚,为了保证避免‘锁饥饿’现象的发生,它需要等待前面的client3和client4释放锁之后才能获取读锁,所以它对client4节点进行监听。

    curator

    Curator是Netflix公司开源的一个Zookeeper客户端,相比较Zookeeper原生的客户端,它提供了更人性化的API风格,同时它也提供了一些工具类的封装,比如上述的互斥锁和读写锁它都有实现,如下所示:

    • InterProcessMutex 实现可重入互斥锁
    • InterProcessReadWriteLock 实现读写锁

    小结

    Zookeeper的数据特点比较容易实现CLH(Craig, Landin, and Hagersten)队列,可以很轻松实现公平锁。它的session通过周期性的keepAlive来维护,一旦session失效后,临时节点会在Zookeeper中删除,从而避免了因为客户端宕机而导致的死锁。Zookeeper本身属于CP型的,在数据一致性方面做的很好,所以使用它实现分布式锁比较安全。但是使用它需要维护额外的Zookeeper集群,使用成本上相对来说高一些。

    Redis方案

    setnx版本

    使用Redis实现分布式锁,最普遍的做法是使用setNX指令,为了避免客户端宕机导致死锁,通常在加锁成功之后对key设置过期时间,如下所示:

        Long result = jedis.setnx(lockKey, requestId);
        if (result!=null&&result == 1) {
            jedis.expire(lockKey, expireTime);
        }
    

    但是上述过程有问题,setNX和expire是分开的两步操作,整个加锁过程不是原子性的,如果在setNx后客户端宕机了,也会导致死锁。

    在Redis 2.6.12 版本开始, SET命令提供了多个参数,如NX,EX等。setNX和expire操作可以使用一条指令完成:

        //value中可以保存线程标识。
        jedis.set(lockKey, requestId, "NX", "EX", expireTime);
    

    锁的释放操作,可以简单的使用删除key的方式进行释放,但是这样做不安全,如果执行时间大于过期时间的话,锁可能会被勿删。举个例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。如果仅使用DEL命令将key删除,那么这种情况就会把客户端B的锁给删除掉。比较安全的做法是采用lua脚本,告诉redis只有key存在且其value等于指定的值才删除成功,lua脚本如下所示:

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    可以使用jedis执行lua脚本:

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (1L.equals(result)) {
            return true;
        }
        return false;
    

    使用setNX只能实现简单的互斥操作,如果想要实现更复杂的功能可以使用Redisson。

    Redisson版本

    Redisson提供了丰富锁工具和线程同步工具:

    • RLock:互斥、公平/非公平、可重入锁
    • MultiLock:联合锁
    • RedLock:红锁
    • ReadWriteLock:读写锁
    • Semaphore:信号量
    • PermitExpirableSemaphore:支持过期的信号量
    • CountDownLatch:栅栏

    RLock

    RLock锁的特点:互斥可重入锁,支持公平/非公平模式。RLock实现了java的Lock接口,它的用法和ReentrantLock一样,如下所示:

    RLock lock = redisson.getLock("anyLock");
    RLock fairLock = redisson.getFairLock("anyLock");
    // Most familiar locking method
    lock.lock();
    lock.unlock();
    

    它使用lua脚本实现,先来看它的非公平模式下获取锁流程:

    • KEYS[1] :需要加锁的key,这里需要是字符串类型。
    • ARGV[1] :锁的超时时间,防止死锁
    • ARGV[2] :锁的唯一标识,id(UUID.randomUUID()) + “:” + threadId
    //不存在初始化为1
    if (redis.call('exists', KEYS[1]) == 0) then 
        redis.call('hset', KEYS[1], ARGV[2], 1); 
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return nil; 
    end; 
    //如果是重入,value则加1
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)then 
        redis.call('hincrby', KEYS[1], ARGV[2], 1); 
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return nil; 
    end; 
    return redis.call('pttl', KEYS[1]);
    

    锁的状态采用hash结构存储,hash的item为线程标识,value为锁的重入次数,同时lock接口中还订阅解锁的channel,用于竞争锁资源。

    它解锁的流程:

    • KEYS[1] :需要加锁的key,这里需要是字符串类型。
    • KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”
    • ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
    • ARGV[2] :锁的超时时间,防止死锁
    • ARGV[3] :锁的唯一标识, id(UUID.randomUUID()) + “:” + threadId
    if (redis.call('exists', KEYS[1]) == 0) then 
        redis.call('publish', KEYS[2], ARGV[1]); 
        return 1; 
    end; 
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
        return nil; 
    end; 
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
    if (counter > 0) then 
        redis.call('pexpire', KEYS[1], ARGV[2]);
        return 0;
    else redis.call('del', KEYS[1]);
        redis.call('publish', KEYS[2], ARGV[1]);
        return 1;
    end;
    return nil;
    

    lock接口源码:

        public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            Long ttl = this.tryAcquire(leaseTime, unit, threadId);
            if (ttl != null) {
                //订阅对应key的channel。
                RFuture<RedissonLockEntry> future = this.subscribe(threadId);
                this.commandExecutor.syncSubscription(future);
                try {
                    while(true) {
                        //尝试执行lua脚本,去获取锁。
                        ttl = this.tryAcquire(leaseTime, unit, threadId);
                        if (ttl == null) {
                            return;
                        }
                        //如果ttl大于0,则最大等待ttl时间,再去尝试获取锁。
                        if (ttl >= 0L) {
                            //此处采用信号量实现的,如果没有锁释放,则阻塞等待通知。
                            this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } else {
                            this.getEntry(threadId).getLatch().acquire();
                        }
                    }
                } finally {
                    this.unsubscribe(future, threadId);
                }
            }
        }
    

    整体的流程图如下:


    image

    下列是公平锁获取锁的lua脚本:

    • KEYS[1]:需要加锁的key
    • KEYS[2]:Redisson的redisson_lock_queue_key 使用list存储等待锁的客户端,相当于CLH队列,保证节点获取锁的公平性。
    • KEYS[3]:Redisson的redisson_lock_timeout_key 使用zset存储锁的等待时间和list元素的去重处理。
    • ARGV[1]: 锁的超时时间,防止死锁
    • ARGV[2]: 锁的唯一标识,id(UUID.randomUUID()) + “:” + threadId
    • ARGV[3]: currentTime+5000L(ms)
    • ARGV[4]:currentTime
    while true
    //检查redisson_lock_queue_key里面元素,删除过期的元素。
    do local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
        if firstThreadId2 == false then
            break;
        end;
        local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
        if timeout <= tonumber(ARGV[4]) then
            redis.call('zrem', KEYS[3], firstThreadId2);
            redis.call('lpop', KEYS[2]);
        else break;
        end;
    end;
    //如果key不存在,且redisson_lock_queue_key为空,或者redisson_lock_queue_key里面第一个元素是请求者,则获取锁成功,并返回null。
    if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
        redis.call('lpop', KEYS[2]);
        redis.call('zrem', KEYS[3], ARGV[2]);
        redis.call('hset', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return nil;
    end;
    
    //如果判断持有锁的线程是自己,则进行锁重入处理,count++,并返回null。
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return nil;
    end;
    
    //设置ttl时间,用于重试获取锁的时间。
    local firstThreadId = redis.call('lindex', KEYS[2], 0);
    local ttl;
    //获取ttl,如果队列中第一个元素不是请求者,则取第一个元素的ttl,否则取当前锁的ttl。
    if firstThreadId ~= false and firstThreadId ~= ARGV[2] then
        ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);
    else 
        ttl = redis.call('pttl', KEYS[1]);
    end;
    local timeout = ttl + tonumber(ARGV[3]);
    
    //将请求者加入队列中。
    if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
    redis.call('rpush', KEYS[2], ARGV[2]);
    end;
    return ttl;
    

    公平锁实现要比非公平锁复杂一些,为了保证公平锁的公平性,维护了一个list结构,当做FIFO队列,锁争用发生时,只有list中的第一个元素能去获取锁。同时还维护了一个zset结构,用于存储每个客户端获取锁的超时时间,避免list中存在大量失效的数据,同时也可以用来对list进行去重。

    公平锁的释放逻辑和非公平锁也不一样,具体代码细节,我还没有去看,感兴趣同学可以自己去翻阅Redisson的源码。

    使用Redis锁安全问题

    如果Redis挂了怎么办?你可能会说,可以通过增加一个slave节点解决这个问题。但这通常是行不通的。这样做,我们不能实现资源的独享,因为Redis的主从同步通常是异步的。

    在这种场景(主从结构)中存在明显的竞态:

    • 客户端A从master获取到锁
    • 在master将锁同步到slave之前,master宕掉了。
    • slave节点被晋级为master节点
    • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!

    为了解决这个问题,Redis的作者提出了一种解决方案:RedLock。

    RedLock

    它的基本思想是使用多个Redis实例,比如说使用5个独立的Redis实例,让他们部署在不同的机器上,避免同时宕掉,然后使用五个Redis分别去获取锁,如果超过一半的机器获取成功则获取锁成功,下面是客户端获取锁的操作步骤:

    1. 获取当前Unix时间,以毫秒为单位。
    2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
    3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
    4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
    5. 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

    释放锁比较简单,向所有的Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁.

    Redisson提供了RedLock的实现

        RedissonClient client1 = Redisson.create();
        RedissonClient client2 = Redisson.create();
        RLock lock1 = client1.getLock("lock1");
        RLock lock2 = client1.getLock("lock2");
        RLock lock3 = client2.getLock("lock3");
        RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
        lock.lock();
        lock.unlock();
        client1.shutdown();
        client2.shutdown();
    

    有关RedLock的争论

    关于RedLock是否安全的问题,Martin Kleppmann和Redis之父Antirez有过激烈的讨论。Martin Kleppmann是剑桥大学分布式系统研究员,他写了一本书《Designing Data-Intensive Applications》,他对RedLock算法提出了质疑,认为RedLock算法没卵用,下面是它原文给出的结论:

    I think the Redlock algorithm is a poor choice because it is “neither fish nor fowl”: it is unnecessarily heavyweight and expensive for efficiency-optimization locks, but it is not sufficiently safe for situations in which correctness depends on the lock.

    Martin认为分布式锁普遍存在一个问题:


    image

    如果Client1在获取锁之后,由于GC-STW导致线程阻塞时间大于锁的过期时间,锁被提前释放掉了。这时如果Client2也过来获取锁,也能够获取成功,造成了错误。

    这种情况并不只是理论上的,现实中有真实的案例。HBase曾经就遇到过这个问题:https://www.slideshare.net/enissoz/hbase-and-hdfs-understanding-filesystem-usage

    除了上述的GC-STW,还有可能别的原因造成上述问题。比如说在真正执行更新操作之前,需要网络请求外部资源或者读取文件等操作,也会发生阻塞,造成同样的问题。

    作者给出了一种解决上述问题的办法,如下图所示:


    image
    • 锁服务需要一个递增的token
    • 客户端获取锁成功后需要使用这个token进行更新资源
    • 资源数据库,更新时需要对token进行检查,如果token小于等于已保存的token,则拒绝更新。

    上述的整个过程有点类似CAS。如果使用Zookeeper实现分布式锁的话可以使用其事物id(zxid)或者node的版本号实现token。

    对于RedLock而言,因为它的底层采用多个Redis实例,它没法生成递增的token,所以无法用token的方式解决作者描述的问题。Martin认为RedLock除了不能生成递增token外,还有其它的问题,因为RedLock是建立在一些假设上的:

    1. 假设所有Redis节点在密钥过期之前持有密钥的时间大约是正确的;
    2. 网络延时与失效时间相比较小;
    3. 而且该进程暂停的时间比过期时间短得多。

    举个例子:假设系统有五个Redis节点(A、B、C、D和E)和两个客户机(clien1和client2)

    1. Client1获取节点A、B、C上的锁。由于网络问题,无法到达D和E。
    2. 节点C上的时钟向前跳转,导致锁过期。
    3. Client2获取节点C、D、E上的锁,由于网络问题,无法到达A和B。
    4. Client1 和 Client2现在都持有了锁。

    步骤2的原因是因为Redis的过期策略是建立在当前物理机的时钟上的,如果物理机的时间发生跳转,那么key的过期时间也会更变。如果C在将锁持久化到磁盘之前崩溃,并立即重启,也会发生类似的问题。出于这个原因,Redlock文档建议延迟重启崩溃节点的时间,至少要达到最长锁的生存时间。但是这种又产生了一种前提,需要保证重启的延时时间大于锁的生存时间。

    Martin觉得前面这个时钟跳跃的例子还不够,又给出了一个由客户端GC pause引发Redlock失效的例子。如下:

    • Client1向Redis节点A, B, C, D, E发起锁请求。
    • 各个Redis节点已经把请求结果返回给了Client端1,但Client1在收到请求结果之前进入了长时间的GC pause。
    • 在所有的Redis节点上,锁过期了。
    • Client2在A, B, C, D, E上获取到了锁。
    • Client1从GC pause从恢复,收到了前面第2步来自各个Redis节点的请求结果。客户端1认为自己成功获取到了锁。
    • Client1和Client2现在都认为自己持有了锁。

    Martin给的这个例子其实有一些问题,因为RedLcok在获取锁之后,会拿获取锁的时间和锁的过期时间进行比较,如果A、B、C、D、E都过期了,那么RedLock实际上是获取锁失败。关于这一点,Redis之父在他的反驳文中也进行了阐述。

    最后,Martin得出了如下的结论:

    • 如果是为了效率(efficiency)而使用分布式锁,允许锁的偶尔失效,那么使用单Redis节点的锁方案就足够了,简单而且效率高。Redlock则是个过重的实现(heavyweight)。
    • 如果是为了正确性(correctness)在很严肃的场合使用分布式锁,那么不要使用Redlock。它不是建立在异步模型上的一个足够强的算法,它对于系统模型的假设中包含很多危险的成分(对于timing)。而且,它没有一个机制能够提供token。那应该使用什么技术呢?Martin认为,应该考虑类似Zookeeper的方案,或者支持事务的数据库。

    有关他们之间的争论,我没有做过多的分析,网上有一篇文章总结的很好:https://www.jianshu.com/p/dd66bdd18a56

    总结

    具体使用哪一种分布式锁最好呢?没有最好的,应该根据不同的业务场景选取最合适的算法。Martin给出的结论值得参考,如果使用分布式锁是为了效率问题,那么直接使用Redis的setNX实现就可以了。如果想要保证正确性,推荐使用Zookeeper或者支持事务的数据库。如果想要使用分布式锁实现一些比较复杂的功能且能够接受小概率的不准确性,可以采用Reisson提供的锁工具。

    参考资料

    相关文章

      网友评论

        本文标题:分布式锁

        本文链接:https://www.haomeiwen.com/subject/fmaaaqtx.html