分布式锁

作者: 知止9528 | 来源:发表于2019-01-22 21:45 被阅读56次

    前面小结过java锁,这次在来总结下分布式锁

    Java锁如下

    Java锁.jpg

    分布式锁方案

    • 基于数据库
    • 基于Redis
    • 基于Zookeeper

    三个基本要求

    阻塞或者非阻塞?
    一般都需要支持重入
    公平还是非公平?公平的话就需要排队

    基于数据库

    在查询语句的后面加上for update,数据库就会在查询过程中给数据库表增加排他锁(注意:InnoDB的行级锁是加在索引上的,所以需要给字段加上索引,而且最好是唯一索引)

    有几个问题

    • 走索引才会走到行级锁,而Mysql会对查询进行优化,根据执行计划的代价来判断是走全表扫描还是走索引,如果走全表扫描的话那就是表锁了
    • 会占用数据库连接,如果排他锁长时间不提交,会导致连接越来越多,有可能把连接池撑爆

    Redis锁

    基于Redis的原理呢,就是因为它是单线程的,也就是多个线程的命令走到redis这里也需要根据先后顺序进行排队.

    为了避免死锁,需要设置超时时间

    那这里就有几个问题?
    1.Redis的setnx()和expire()是两个方法,也即不是一个原子操作
    2.这个超时时间往往跟业务执行时间有关,有可能会设置得过长或者过短

    第一种方案
    步骤如下

    setnx(lockkey,1) 返回0,则说明占位失败,如果成功,则说明占位成功
    expire()对lockkey设置过期时间
    delete(lockkey)  执行完成后,删除key
    

    我们前面分析过,setnx()和expire()不是原子操作,即第一个成功了,第二个失败了
    当然有一种解决方案是只使用setnx()而不用使用expire()方法了,就是setnx()的时候把过期时间当做值设置进去,然后获取锁的时候,根据值来判断是否过期,但这样也会有一个问题就是全局时钟不一致的问题,就即多台机器上的系统时间可能有差异,也即我判断超时的标准根本就不一样

    其次就是脑裂问题即A先获取到了锁,但还没执行完就宕机了.然后B获取到了锁,然后A这个时候又活过来了,此时A和B都拥有同一把锁.
    针对这个的解决方案都是采用租约机制,即A有一个任期,B有一个任期,B的任期比A的大,那么就算A活过来了由于任期比较小,也需要重写去获取锁

    此外,delete()操作也有问题,需要判断是不是锁的拥有者才能进行相应的删除


    第二种方案
    set(key,value,nx,expire,time)

    依然没有解决过期时间到底设置多长的问题


    第三种方案
    基于Redission
    redission是redis官方的分布式锁组件
    GitHub地址为
    https://github.com/redission/redisson

    那么它是怎么来解决这个超时时间到底设置多长?

    每获得一个锁时,只设置一个很短的超时时间,同时开启一个线程在每次超时时间快到的时候去轮询下业务是否做完了来决定是不是要延长超时时间

    当然redission提供了一整套锁的方案,感兴趣的可以自行查看


    基于Zookeeper

    *zk一般有多个节点组成(2n+1),采用zab一致性协议.对其内部数据进行修改时会自动同步到所有节点

    *zk有四种节点类型,持久性节点,持久有序阶段,临时节点,临时有序节点,临时节点会在宕机会进行删除

    *watch机制,client端可以监控每个节点的变化,当产生变化时会给client产生一个事件

    zk锁的基本原理

    每个获取锁的操作都会创建一个临时有序的节点,只有序号最小的可以拥有锁,如果节点不是最小的话则使用watch机制监听前一个节点(避免羊群效应,是不是和AQS优点像?)

    具体可见前面对zk原理的分析文章

    最后再来分析下Redission锁的源码实现

    引入Maven依赖

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>2.2.13</version>
            </dependency>
    

    代码如下

     String ip="";
            String port="";
            Config config=new Config();
            config.useSingleServer().setAddress(ip+":"+port);
            RedissonClient redissonClient=Redisson.create(config);
            RLock lock = redissonClient.getLock("lock");
            lock.tryLock(0, 1, TimeUnit.SECONDS);//第一个参数代表等待时间,第二是代表超过时间释放锁,第三个代表设置的时间制
            try {
               System.out.println("执行");
            } finally {
              lock.unlock();
            }
    

    源码分析

    先来看常用的Lock方法实现。

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }
    

    再看lockInterruptibly方法

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        // 获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        if (ttl == null) { // 获取成功
            return;
        }
        // 异步订阅redis chennel
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future); // 阻塞获取订阅结果
        try {
            while (true) {// 循环判断知道获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }
                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);// 取消订阅
        }
    }
    

    小结:lockInterruptibly:获取锁,不成功则订阅释放锁的消息,获得消息前阻塞。得到释放通知后再去循环获取锁。

    下面重点看看如何获取锁:Long ttl = tryAcquire(leaseTime, unit, threadId)

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));// 通过异步获取锁,但get(future)实现同步
    }
    

    tryAcquireAsync()方法

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) { //1 如果设置了超时时间,直接调用 tryLockInnerAsync
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //2 如果leaseTime==-1,则默认超时时间为30s
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
        //3 监听Future,获取Future返回值ttlRemaining(剩余超时时间),获取锁成功,但是ttlRemaining,则刷新过期时间
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
    

    tryLockInnerAsync()方法

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(
            getName(), 
            LongCodec.INSTANCE, 
            command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
            Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

    这个方法主要就是调用redis执行eval lua,为什么使用eval,因为redis对lua脚本执行具有原子性。把这个方法翻译一下:

    -- 1.  没被锁{key不存在}
    eval "return redis.call('exists', KEYS[1])" 1 myLock
    -- (1) 设置Lock为key,uuid:threadId为filed, filed值为1
    eval "return redis.call('hset', KEYS[1], ARGV[2], 1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (2) 设置key过期时间{防止获取锁后线程挂掉导致死锁}
    eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 2. 已经被同线程获得锁{key存在并且field存在}
    eval "return redis.call('hexists', KEYS[1], ARGV[2])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (1) 可重入,但filed字段+1
    eval "return redis.call('hincrby', KEYS[1], ARGV[2],1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (2) 刷新过去时间
    eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 3. 已经被其他线程锁住{key存在,但是field不存在}:以毫秒为单位返回 key 的剩余超时时间
    eval "return redis.call('pttl', KEYS[1])" 1 myLock
    

    这就是核心获取锁的方式,下面直接释放锁方法unlockInnerAsync:

    -- 1. key不存在
    eval "return redis.call('exists', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (1) 发送释放锁的消息,返回1,释放成功
    eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 2. key存在,但field不存在,说明自己不是锁持有者,无权释放,直接return nil
    eval "return redis.call('hexists', KEYS[1], ARGV[3])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    eval "return nil"
    -- 3. filed存在,说明是本线程在锁,但有可能其他地方重入锁,不能直接释放,应该-1
    eval "return redis.call('hincrby', KEYS[1], ARGV[3],-1)" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 4. 如果减1后大于0,说明还有其他重入锁,刷新过期时间,返回0。
    eval "return redis.call('pexpire', KEYS[1], ARGV[2])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 5. 如果不大于0,说明最后一把锁,需要释放
    -- 删除key
    eval "return redis.call('del', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 发释放消息
    eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 返回1,释放成功
    

    从释放锁代码中看到,删除key后会发送消息,所以上文提到获取锁失败后,阻塞订阅此消息。

    另外,上文提到刷新过期时间方法scheduleExpirationRenewal,指线程获取锁后需要不断刷新失效时间,避免未执行完锁就失效。这个方法的实现原理也类似,只是使用了Netty的TimerTask,每到过期时间1/3就去重新刷一次,如果key不存在则停止刷新。Timer实现大概如下:

    private static void nettyTimer() {
        final int expireTime = 6;
        EventExecutorGroup group = new DefaultEventExecutorGroup(1);
        final Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask -> {
            Future<Boolean> future = group.submit(() -> {
                System.out.println("刷新key的失效时间为"+expireTime +"秒");
                return false;// 但key不存在时,返回true
            });
            future.addListener(future1 -> {
                if (!future.getNow()) {
                    nettyTimer();
                }
            });
        }, expireTime/3, TimeUnit.SECONDS);
    }
    

    即验证了我们之前说的Redission会去刷新过期时间

    相关文章

      网友评论

        本文标题:分布式锁

        本文链接:https://www.haomeiwen.com/subject/oazrjqtx.html