美文网首页
Redis应用之分布式锁

Redis应用之分布式锁

作者: 逍遥白亦 | 来源:发表于2020-11-24 15:08 被阅读0次

    1. 分布式应用并发问题

    分布式应用进行逻辑处理时经常会遇到并发问题,比如下面这个例子。

    在Redis里对"account"这个key进行操作,两个客户端同时要修改account,首先要读取account的值,然后对其进行修改,修改完再存回去,那么就会出现并发问题,因为读取和保存状态这两个操作不是原子的。


    image

    2. Redis实现分布式锁

    2.1 第一个示例

    2.1.1 创建工程

    首先创建一个空工程RedisStart,pom文件如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>RedisStart</artifactId>
        <packaging>pom</packaging>
        <version>1.0-SNAPSHOT</version>
        <modules>
            <module>redis-lock</module>
        </modules>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.3.RELEASE</version>
        </parent>
    
        <profiles>
            <profile>
                <id>jdk-1.8</id>
                <activation>
                    <activeByDefault>true</activeByDefault>
                    <jdk>1.8</jdk>
                </activation>
                <properties>
                    <maven.compiler.source>1.8</maven.compiler.source>
                    <maven.compiler.target>1.8</maven.compiler.target>
                    <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
                </properties>
            </profile>
        </profiles>
    
    </project>
    

    2.1.2 创建子工程

    创建子工程redis-lock,工程结构如下


    image

    pom文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>RedisStart</artifactId>
            <groupId>org.example</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>redis-lock</artifactId>
        <packaging>jar</packaging>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.6.5</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    配置文件:

    server:
      port: 8090
    
    spring:
      redis:
        host: 127.0.0.1
        port: 6379
    

    2.1.3 启动类

    package com.redisson;
    
    import org.redisson.Redisson;
    import org.redisson.config.Config;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        public Redisson redisson(){
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
            return (Redisson) Redisson.create(config);
        }
    
    }
    
    

    2.1.4 关键类

    首先先在Redis中执行set account 50这条指令,加入数据。然后请看下面代码:

    package com.redisson;
    
    import org.redisson.Redisson;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class IndexController {
    
        @Autowired
        private Redisson redisson;
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @RequestMapping("/deduct account")
        public String deductAccount() throws InterruptedException {
            int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
            if (account > 0){
                int realAccount = account - 1;
                stringRedisTemplate.opsForValue().set("account",realAccount + "");
                System.out.println("扣减成功,剩余库存:"  + realAccount + "");
            }else {
                System.out.println("扣减失败,库存不足");
            }
            return "end";
        }
    
    }
    
    

    上面代码重现了开头提出的问题,如果有多线程知识储备的话,可以很容易想到,加入synchronized关键字即可解决。

        synchronized(this){
            int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
            if (account > 0){
                int realAccount = account - 1;
                stringRedisTemplate.opsForValue().set("account",realAccount + "");
                System.out.println("扣减成功,剩余库存:"  + realAccount + "");
            }else {
                System.out.println("扣减失败,库存不足");
            }
            return "end";
            } 
        }
    

    2.1.5 分布式锁简单实现setnx

    如果上边的服务部署在单机模式下,并且程序没有其他异常的话,可以解决原子性问题,但是现在的服务都是在集群部署下的,如果有多台服务的话,上面代码无法解决这个问题。

    如果看过前面Redis应用之常用数据类型文章的话,可以知道Redis提供了setnx指令实现分布式锁,可以用以下代码实现。

    String lockKey = "lockKey";
            //获取锁信息
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
            //锁还没有被释放,直接返回
            if (!result){
                return "error";
            }
    
            int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
            if (account > 0){
                int realAccount = account - 1;
                stringRedisTemplate.opsForValue().set("account",realAccount + "");
                System.out.println("扣减成功,剩余库存:"  + realAccount + "");
            }else {
                System.out.println("扣减失败,库存不足");
            }
            //业务代码执行完,释放锁
            stringRedisTemplate.delete(lockKey);
            return "end";
    

    这样即使是集群部署下的服务,也可以实现分布式锁功能了,但是如果在执行释放锁那行代码之前,有异常的话,会导致该锁一直被占用,无法释放。

    2.1.6 解决程序异常导致的无法释放锁

    加入finally逻辑,可以保证即使程序有异常的话,也可以释放锁。

    String lockKey = "lockKey";
            try {
                //获取锁信息
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
                //锁还没有被释放,直接返回
                if (!result){
                    return "error";
                }
    
                int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
                if (account > 0){
                    int realAccount = account - 1;
                    stringRedisTemplate.opsForValue().set("account",realAccount + "");
                    System.out.println("扣减成功,剩余库存:"  + realAccount + "");
                }else {
                    System.out.println("扣减失败,库存不足");
                }
            }finally {
                //业务代码执行完,释放锁
                stringRedisTemplate.delete(lockKey);
            }
            
            return "end";
    

    上面代码解决了,如果程序有异常的话,无法释放锁的问题,但是如果在代码执行的一半的时候,服务挂了,或者被Kill掉的话,就不会执行finally里边的代码,所以依然无法释放锁。

    2.1.7 setnx命令的原子操作

    设置锁的时候,直接对该锁加入超时时间,可以解决由于上边的问题。

    String lockKey = "lockKey";
            try {
                //设置锁超时时间
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock",10, TimeUnit.MILLISECONDS);
                //锁还没有被释放,直接返回
                if (!result){
                    return "error";
                }
    
                int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
                if (account > 0){
                    int realAccount = account - 1;
                    stringRedisTemplate.opsForValue().set("account",realAccount + "");
                    System.out.println("扣减成功,剩余库存:"  + realAccount + "");
                }else {
                    System.out.println("扣减失败,库存不足");
                }
            }finally {
                //业务代码执行完,释放锁
                stringRedisTemplate.delete(lockKey);
            }
    
            return "end";
    

    如果并发不高的情况下,上面代码可以实现一个良好的分布式锁,但是如果在高并发情况下,会导致锁一直失效。举个例子:如果锁的失效时间是1秒,A线程成功获得了该锁,但是由于并发比较高,导致程序在1秒钟没有执行完,就不会执行删除锁的代码,但是这个时候由于超过了1秒,所以A线程获取的锁已经失效了, 其他线程可以获得该锁,假设B线程获取到了这把锁,开始执行后边的逻辑,这个时候A线程执行了finally中的逻辑,把B线程获取到的锁删除了,那么其他线程又可以获取到这把锁,而过一段时间该锁又失效了,而B线程就会删除C线程过去到的锁,以此类推,这个锁就会一直失效。

    2.1.8 锁信息与线程信息绑定

    为每一个线程设置线程ID,并将该ID作为锁Key信息的value值,在删除锁的时候,进行判断,如果是本线程自己加的锁,可以删除,如果不是,不能删除。

    String lockKey = "lockKey";
            String clientId = UUID.randomUUID().toString();
            try {
                //设置锁超时时间
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId,30, TimeUnit.MILLISECONDS);
                //锁还没有被释放,直接返回
                if (!result){
                    return "error";
                }
    
                int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
                if (account > 0){
                    int realAccount = account - 1;
                    stringRedisTemplate.opsForValue().set("account",realAccount + "");
                    System.out.println("扣减成功,剩余库存:"  + realAccount + "");
                }else {
                    System.out.println("扣减失败,库存不足");
                }
            }finally {
                //判断该锁是不是当前线程加的
                if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
                    //业务代码执行完,释放锁
                    stringRedisTemplate.delete(lockKey);
                }
            }
    
            return "end";
    

    至此,已经实现了一把比较完善的分布式锁,但是还是会有一些问题,比如业务代码执行时间大于锁的过期时间,如何对该锁进行续期?可以简单的想到,在主线程启动的时候,启动另外一个线程,监控这把锁的状态,启动一个定时任务,定期监控该锁,如果过一段时间,锁还没释放,就把锁的失效时间重新置位启始值。但这事儿说起来容易,实际写的时候,稍不注意,就会引起很多BUG。

    2.2 用Redisson实现一个分布式锁

    好在,贴心的Redisson已经帮我们实现了功能完善的分布式锁。请看下面代码:

            String lockKey = "lockKey";
            RLock redissonLock = redisson.getLock(lockKey);
            try {
                //加锁,实现锁续命
                redissonLock.lock();
                int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
                if (account > 0){
                    int realAccount = account - 1;
                    stringRedisTemplate.opsForValue().set("account",realAccount + "");
                    System.out.println("扣减成功,剩余库存:"  + realAccount + "");
                }else {
                    System.out.println("扣减失败,库存不足");
                }
            }finally {
                //释放锁
                redissonLock.unlock();
            }
    

    可以看到加锁逻辑十分简洁。

    2.2.1 Redisson加锁逻辑

    下边用一张简单的图来说明下:


    image

    2.2.2 源码阅读

    由于整个Redisson的代码实在太多,所以这里只看主要逻辑。

    2.2.2.1 初始化逻辑

    先看下边代码对应的源码

    RLock redissonLock = redisson.getLock(lockKey);
    

    会调用RedissonLock的构造方法,初始化锁的名称以及锁的失效时间为30秒

        public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
            super(commandExecutor, name);
            this.commandExecutor = commandExecutor;
            this.id = commandExecutor.getConnectionManager().getId();
            this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        }
    

    初始化锁的失效时间

        private long lockWatchdogTimeout = 30 * 1000;
    

    2.2.2.2 加锁逻辑

    调用RedissonLock.lockInterruptibly()

        @Override
        public void lockInterruptibly() throws InterruptedException {
            lockInterruptibly(-1, null);
        }
    

    下边代码大体逻辑就是,先尝试获取锁,如果获取成功就直接返回,如果没有获取成功,就自旋,直到获取成功。

    @Override
        public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
            //获取当前线程ID
            long threadId = Thread.currentThread().getId();
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // 获取锁就返回
            if (ttl == null) {
                return;
            }
    
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            commandExecutor.syncSubscription(future);
    
            try {
                //没获取锁自旋获取
                while (true) {
                    //再次尝试获取锁
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    //获取锁就返回
                    if (ttl == null) {
                        break;
                    }
    
                    //自旋的时候等待一段时间,下次再次获取
                    if (ttl >= 0) {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                unsubscribe(future, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    

    获取锁的逻辑

        private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
            return get(tryAcquireAsync(leaseTime, unit, threadId));
        }
    

    继续看tryAcquireAsync(leaseTime, unit, threadId)

        private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
            if (leaseTime != -1) {
                return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            }
            RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                @Override
                public void operationComplete(Future<Long> future) throws Exception {
                    if (!future.isSuccess()) {
                        return;
                    }
    
                    Long ttlRemaining = future.getNow();
                    // lock acquired
                    if (ttlRemaining == null) {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    

    由于传入的leaseTime为-1,所以请看如下代码

        <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    Redisson源码大量调用了Lua脚本,用于实现原子性。下面来分析这几个脚本,KEYS[1]为锁的名称、ARGV[1]为锁的失效时间、ARGV[2]为当前线程的ID。

        "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
        "return redis.call('pttl', KEYS[1]);",
    

    上边脚本的逻辑为,首先判断锁是否存在,如果锁不存在,那么就设置该锁,并设置失效时间,返回空;如果锁已存在,就判断该锁是否是当前线程加的锁,如果是当前线程加的锁,就对这个锁的调用次数加1,并刷新锁的失效时间,返回空;如果不是当前线程加的锁,那么该返回该锁的失效时间。

    下面来看监听里边相关的代码,这部分代码用了一个延时的任务执行,每隔10秒就查询当前线程获取的锁状态,如果存在就对失效时间进行刷新,并返回1,不存在就直接返回0。

        private void scheduleExpirationRenewal(final long threadId) {
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }
    

    2.2.3 存在问题

    由于Redis一般是集群部署的,所以会出现由于主节点挂掉的话,从节点会取而代之。这时候如果客户端在主节点上申请成功了一把锁,但是这把锁还没有来得及同步到从节点,主节点突然挂了,然后从节点变为主节点,这个新的节点内部没有这个锁,所以当另一个客户端过来请求加锁时,立即就批准了,这样就导致系统中同样一把锁被两个客户端同时持有,不安全性由此产生。

    3. RedLock算法

    为了解决由于主节点挂掉导致多个客户端同时持有一把锁的问题,Antirez发明了RedLock算法,它的流程比较复杂,不过已经有了很多大神开发了开源的库,用户可以拿来即用,比如redlock-py以及Redisson。

    为了使用 Redlock,需要提供多个Redis实例,这些实例之前相互独立没有主从关系。同很多分布式算法一样,redlock 也使用「大多数机制」。

    加锁时,它会向过半节点发送 set(key, value, nx=True, ex=xxx) 指令,只要过半节点 set成功,那就认为加锁成功。释放锁时,需要向所有节点发送 del 指令。不过 Redlock 算法还
    需要考虑出错重试、时钟漂移等很多细节问题,同时因为 Redlock 需要向多个节点进行读写,意味着相比单实例 Redis 性能会下降一些。

    3.1 RedLock使用场景

    如果你很在乎高可用性,希望挂了一台 redis 完全不受影响,那就应该考虑 redlock。不过代价也是有的,需要更多的 redis 实例,性能也下降了,代码上还需要引入额外的library,运维上也需要特殊对待,这些都是需要考虑的成本,使用前请再三斟酌。

    参考资料

    1. 《Redis深度历险》

    相关文章

      网友评论

          本文标题:Redis应用之分布式锁

          本文链接:https://www.haomeiwen.com/subject/qgiiiktx.html