美文网首页
redis分布式锁

redis分布式锁

作者: 寒雨然 | 来源:发表于2020-02-28 10:57 被阅读0次
    import io.lettuce.core.ScriptOutputType;
    import io.lettuce.core.SetArgs;
    import io.lettuce.core.api.async.RedisAsyncCommands;
    import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
    import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.util.Assert;
    
    import javax.annotation.Resource;
    import java.nio.charset.StandardCharsets;
    import java.util.UUID;
    import java.util.concurrent.ExecutionException;
    
    
    @Component
    public class RedisLockServiceImpl implements RedisLockService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockService.class);
        private static final String UNLOCK_LUA;
        private static final String RESULT_STRING = "OK";
        private static final Integer LOCK_EXPIRE = 300;
    
        // LUA脚本
        static {
            StringBuilder sb = new StringBuilder();
            sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call(\"del\",KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
            UNLOCK_LUA = sb.toString();
        }
    
        ThreadLocal<String> lockValue = new ThreadLocal<>();
    
        @Resource
        private RedisTemplate<String, String> redisTemplate;
    
        public RedisLockServiceImpl(RedisTemplate<String, String> redisTemplate) {
            Assert.notNull(redisTemplate, "redisTemplate should not be null.");
            this.redisTemplate = redisTemplate;
        }
    
        /**
         * 加锁
         *
         * @param key
         * @return
         */
        @Override
        public boolean tryLock(String key) {
            return setRedisLock(key, LOCK_EXPIRE);
        }
    
        /**
         * @param key
         * @param timeOut 单位:秒
         * @return
         */
        @Override
        public boolean tryLock(String key, Integer timeOut) {
            return setRedisLock(key, timeOut);
        }
    
        /**
         * 释放锁
         * 有可能因为持锁之后方法执行时间大于锁的有效期,此时已经被另外一个线程持有锁,所以不能直接删除
         * 使用lua脚本删除redis中匹配localValue的key
         *
         * @param key
         * @return false:锁过期或已不属于当前线程
         */
        @Override
        public boolean releaseLock(String key) {
            // 获取当前线程的uuid
            String value = this.lockValue.get();
            if (value == null) {
                return false;
            }
            // 回收ThreadLocal
            lockValue.remove();
            // 执行解锁命令
            Long result = redisTemplate.execute((RedisCallback<Long>) (connection) -> releaseLockCall(connection, key, value));
            // 锁过期返回0,删除key成功返回1
            return result != null && result > 0;
        }
    
        /**
         * 基于redis加锁
         *
         * @param key
         * @return
         */
        private boolean setRedisLock(String key, Integer expire) {
            String uuid = UUID.randomUUID().toString();
            // 执行加锁命令
            String result = redisTemplate.execute((RedisCallback<String>) (connection) -> lockCallBack(connection, key, uuid, expire));
            if (!RESULT_STRING.equals(result)) {
                return false;
            }
            // 保存当前线程的uuid 解锁时根据此uuid和redis中存的值来确定锁是否为本线程所有
            lockValue.set(uuid);
            return true;
        }
    
        /**
         * redis加锁回调逻辑(区分集群和单机)
         *
         * @param connection REDIS连接
         * @param key        key
         * @param uuid       value
         * @return 加锁的回调方法
         */
        private String lockCallBack(RedisConnection connection, String key, String uuid, Integer expire) {
            // 获取redis连接模式
            Object nativeConnection = connection.getNativeConnection();
            byte[] keyByte = key.getBytes(StandardCharsets.UTF_8);
            byte[] valueByte = uuid.getBytes(StandardCharsets.UTF_8);
            String resultString = "";
            // 集群模式
            if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                // NX为原子性操作 设置过期时间为5分钟,防止宕机无法释放锁
                resultString = commands.getStatefulConnection().sync().set(keyByte, valueByte, SetArgs.Builder.nx().ex(expire));
            }
            // 单机模式
            else if (nativeConnection instanceof RedisAsyncCommands) {
                RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                resultString = commands.getStatefulConnection().sync().set(keyByte, valueByte, SetArgs.Builder.nx().ex(expire));
            }
            return resultString;
        }
    
        /**
         * 释放锁回调逻辑(集群模式与单机模式)
         *
         * @param connection
         * @param key
         * @param lockValue  当前线程uuid
         * @return 解锁回调方法
         */
        private Long releaseLockCall(RedisConnection connection, String key, String lockValue) {
            Long result = 0L;
            try {
                Object nativeConnection = connection.getNativeConnection();
                byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
                byte[] valueBytes = lockValue.getBytes(StandardCharsets.UTF_8);
                Object[] keyParam = new Object[]{keyBytes};
                // 集群模式
                if (nativeConnection instanceof RedisScriptingAsyncCommands) {
                    RedisScriptingAsyncCommands<Object, byte[]> command = (RedisScriptingAsyncCommands<Object, byte[]>) nativeConnection;
                    // LUA脚本具有原子性
                    result = (Long) command.eval(UNLOCK_LUA, ScriptOutputType.INTEGER, keyParam, valueBytes).get();
                }
                return result;
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.info("[RedisLockService][releaseLockCall] eval command fail", e);
                return result;
            }
        }
    
    }
    

    相关文章

      网友评论

          本文标题:redis分布式锁

          本文链接:https://www.haomeiwen.com/subject/gzykhhtx.html