美文网首页Spring Boot
聊聊redisson的分布式锁

聊聊redisson的分布式锁

作者: go4it | 来源:发表于2018-09-21 17:07 被阅读34次

    本文主要研究一下redisson的分布式锁

    maven

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.8.1</version>
            </dependency>
    

    实例

        @Test
        public void testDistributedLock(){
            Config config = new Config();
    //        config.setTransportMode(TransportMode.EPOLL);
            config.useSingleServer()
                    .setAddress("redis://192.168.99.100:6379");
            RedissonClient redisson = Redisson.create(config);
    
    
            IntStream.rangeClosed(1,5)
                    .parallel()
                    .forEach(i -> {
                        executeLock(redisson);
                    });
    
            executeLock(redisson);
        }
    
        public void executeLock(RedissonClient redisson){
            RLock lock = redisson.getLock("myLock");
            boolean locked = false;
            try{
                LOGGER.info("try lock");
                locked = lock.tryLock();
    //            locked = lock.tryLock(1,2,TimeUnit.MINUTES);
                LOGGER.info("get lock result:{}",locked);
                if(locked){
                    TimeUnit.HOURS.sleep(1);
                    LOGGER.info("get lock and finish");
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                LOGGER.info("enter unlock");
                if(locked){
                    lock.unlock();
                }
            }
        }
    

    源码解析

    RedissonLock.tryLock

    redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

        @Override
        public boolean tryLock() {
            return get(tryLockAsync());
        }
    
        @Override
        public RFuture<Boolean> tryLockAsync() {
            return tryLockAsync(Thread.currentThread().getId());
        }
    
        @Override
        public RFuture<Boolean> tryLockAsync(long threadId) {
            return tryAcquireOnceAsync(-1, null, threadId);
        }
    
        private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
            if (leaseTime != -1) {
                return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            }
            RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (!future.isSuccess()) {
                        return;
                    }
    
                    Boolean ttlRemaining = future.getNow();
                    // lock acquired
                    if (ttlRemaining) {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    
        <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    
        protected String getLockName(long threadId) {
            return id + ":" + threadId;
        }
    
        private void scheduleExpirationRenewal(final long threadId) {
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    
                    RFuture<Boolean> future = renewExpirationAsync(threadId);
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
    
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
                task.cancel();
            }
        }
    
        protected RFuture<Boolean> renewExpirationAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
        }
    
        private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
            if (leaseTime != -1) {
                return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            }
            RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                @Override
                public void operationComplete(Future<Long> future) throws Exception {
                    if (!future.isSuccess()) {
                        return;
                    }
    
                    Long ttlRemaining = future.getNow();
                    // lock acquired
                    if (ttlRemaining == null) {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    
    • 这里leaseTime没有设置的话,默认是-1,使用的是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),默认为30秒
    • tryLockInnerAsync使用的是一段lua脚本,该脚本有3个参数,第一个参数为KEYS数组,后面几个参数为ARGV数组的元素
    • 这里key的值为调用方指定的这个redissonLock的名称,两个变量,第一个为leaseTime,第二个为锁的名称,使用redissonLock的id+线程id
    • lua脚本第一个方法判断redissonLock的hashmap是否存在,如果不存在则创建,该hashmap有一个entry的key为锁名称,valude为1,之后设置该hashmap失效时间为leaseTime
    • lua脚本第二个方法是在redissonLock的hashmap存在的情况下,将该锁名的value增1,同时设置失效时间为leaseTime
    • 最后返回该redissonLock名称的key的ttl
    • 执行成功之后判断ttl是否还有值,有的话则调用scheduleExpirationRenewal,防止lock未执行完就失效
    • scheduleExpirationRenewal是注册一个延时任务,在internalLockLeaseTime / 3的时候触发,执行的方法是renewExpirationAsync,将该锁失效时间重置回internalLockLeaseTime
    • scheduleExpirationRenewal里头给scheduleExpirationRenewal任务增加listener,如果设置成功之后还会再次递归调用scheduleExpirationRenewal重新注册延时任务
    • tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法是指定自动解锁时间时调用的方法,它与tryAcquireOnceAsync的区别在于,它对ttl的方回值采用long值来判断,如果是null,才执行延长失效时间的定时任务,而tryAcquireOnceAsync方法采用的是BooleanNullReplayConvertor,只要返回不是null,则返回true

    RedissonLock.unlock

    redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

        @Override
        public void unlock() {
            try {
                get(unlockAsync(Thread.currentThread().getId()));
            } catch (RedisException e) {
                if (e.getCause() instanceof IllegalMonitorStateException) {
                    throw (IllegalMonitorStateException)e.getCause();
                } else {
                    throw e;
                }
            }
            
    //        Future<Void> future = unlockAsync();
    //        future.awaitUninterruptibly();
    //        if (future.isSuccess()) {
    //            return;
    //        }
    //        if (future.cause() instanceof IllegalMonitorStateException) {
    //            throw (IllegalMonitorStateException)future.cause();
    //        }
    //        throw commandExecutor.convertException(future);
        }
    
        @Override
        public RFuture<Void> unlockAsync(final long threadId) {
            final RPromise<Void> result = new RedissonPromise<Void>();
            RFuture<Boolean> future = unlockInnerAsync(threadId);
    
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (!future.isSuccess()) {
                        cancelExpirationRenewal(threadId);
                        result.tryFailure(future.cause());
                        return;
                    }
    
                    Boolean opStatus = future.getNow();
                    if (opStatus == null) {
                        IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                + id + " thread-id: " + threadId);
                        result.tryFailure(cause);
                        return;
                    }
                    if (opStatus) {
                        cancelExpirationRenewal(null);
                    }
                    result.trySuccess(null);
                }
            });
    
            return result;
        }
    
        protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
        }
    
        String getChannelName() {
            return prefixName("redisson_lock__channel", getName());
        }
    
        void cancelExpirationRenewal(Long threadId) {
            ExpirationEntry task = expirationRenewalMap.get(getEntryName());
            if (task != null && (threadId == null || task.getThreadId() == threadId)) {
                expirationRenewalMap.remove(getEntryName());
                task.getTimeout().cancel();
            }
        }
    
    • unlockInnerAsync通过lua脚本来释放锁,该lua使用两个key,一个是redissonLock名称,一个是channelName
    • 该lua使用的变量有三个,一个是pubSub的unlockMessage,默认为0,一个是internalLockLeaseTime,默认为commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),一个是锁名称
    • 如果该redissonLock不存在,则直接发布unlock消息返回1;如果该锁不存在则返回nil;
    • 如果该锁存在则将其计数-1,如果counter大于0,则重置下失效时间,返回0;如果counter不大于0,则删除该redissonLock锁,发布unlockMessage,返回1;如果上面条件都没有命中返回nil
    • unlockAsync里头对unlockInnerAsync注册了FutureListener,主要是调用cancelExpirationRenewal,取消掉scheduleExpirationRenewal任务

    LockPubSub

    redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java

    public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
    
        public static final Long unlockMessage = 0L;
    
        @Override
        protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
            return new RedissonLockEntry(newPromise);
        }
    
        @Override
        protected void onMessage(RedissonLockEntry value, Long message) {
            if (message.equals(unlockMessage)) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                }
    
                value.getLatch().release();
            }
        }
    
    }
    
    • 接收到unlockMessage的时候,会调用RedissonLockEntry的listener,然后触发latch的release
    • tryAcquireOnceAsync这个方法默认没有创建LockPubSub,而且没有指定自动解锁时间,则定时任务会一直延长失效时间,这个可能存在锁一直没释放的风险

    小结

    加锁有如下注意事项:

    • 加锁需要设置超时时间,防止出现死锁
    • 加锁以及设置超时时间的时候,需要保证两个操作的原子性,因而最好使用lua脚本或者使用支持NX以及EX的set方法
    • 加锁的时候需要把加锁的调用方信息,比如线程id给记录下来,这个在解锁的时候需要使用
    • 对于加锁时长不确定的任务,为防止任务未执行完导致超时被释放,需要对尚未运行完的任务延长失效时间

    解锁有如下注意事项:

    • 解锁一系列操作(判断key是否存在,存在的话删除key等)需要保证原子性,因而最好使用lua脚本
    • 解锁需要判断调用方是否与加锁时记录的是否一致,防止锁被误删
    • 如果有延续失效时间的延时任务,在解锁的时候,需要终止掉该任务

    doc

    相关文章

      网友评论

        本文标题:聊聊redisson的分布式锁

        本文链接:https://www.haomeiwen.com/subject/tykznftx.html