美文网首页
基于Redis的实现分布式锁(本文提供两种方式)

基于Redis的实现分布式锁(本文提供两种方式)

作者: 初心myp | 来源:发表于2019-04-04 17:24 被阅读0次

    关于分布式锁

    我们学习过并发编程中的锁机制:synchronized和Lock。在单进程的系统中,当多线程同时修改某个变量时,就需要对变量或者代码块做同步,使其在修改该变量时能过线性执行消除并发修改变量。而同步的本质就是通过锁来实现的。为了实现多个线程在同一时刻同一个代码块只能一个线程执行,那么就需要在某个地方做个标记,这个标记必须要每个线程都可见,在标记不存在时,可以设置该标记,其余后续线程发现已经有标记了,就只能等待拥有标记的线程执行完代码块删除标记后,在进行尝试设置标记

    分布式环境下,数据一致性问题也是必要重要的话题,而又与单进程情况不同。分布式与单机最大的不同就是,分布式不是多线程而是多进程。多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标准存储在一个所有进程都能看见的地方

    常见的是秒杀的场景,订单服务部署了多个实例。例如秒杀商品有4个,第一个用户购买3个,第二个用户购买2个,理想情况下应该是第一个用户可以购买成功,第二个用户提示购买失败。而实际的上可能会出现的情况是两个用户看到的商品数量都是4个,而第一个用户买了3个之后,还没更新库,第二个用户下了2个商品的订单,更新库存就会出错

    在上面的场景中,商品的库存是共享变量,面对高并发的情形,需要保证对资源的访问互斥。在单机环境中,Java中其实提供了很多并发处理相关的api,但是这些api在分布式场景中就无能为力了。也就是说单纯的Java api并不能提供分布式锁的能力。分布式系统中,由于分布式系统的分布性,即多线程和多进程并且分布在不同的机器中,synchronized和Lock这两种锁将失去原有锁的效果,需要我们自己实现分布式锁

    常见的锁方案:
    • 基于数据库实现的分布式锁
    • 基于缓存实现的分布式锁,例如Redis
    • 基于zookeeper实现分布式锁

    基于Redis实现的分布式锁

    SETNX

    使用Redis的setnx实现分布式锁,多进程执行以下Redis命令:

    SETNX lock.id <current Unix time + lock timeout + 1>
    

    SETNX是将key的值设为value,当且仅当key不存在。若给定的key已经存在,则SETNX不做任何操作。

    • 返回1,说明该进程获得所,SETNX将键lock.id的值设置为锁的超时时间,当前时间+加上锁的有效时间。
    • 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX操作,已获得锁
    存在死锁的问题

    SETNX实现分布式锁,可能会存在死锁的情况。与单机模式下的锁相比,分布式环境下不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。某个线程获取了锁之后,断开了与Redis的连接,锁没有及时释放,竞争该锁的其他线程都会hung,产生死锁的情况。
    在使用SETNX获得锁时,我们将键lock.id的值设置为锁的有效时间,线程获得锁后,其他线程还会不断地检索锁是否已超时,如果超时,等待的线程也将有机会获得锁。然而,锁超时,我们不能简单的使用del命令删除键lock.id已释放锁。
    考虑情况如下:

    1. A已经首先获得了锁lock.id,然后线A断线。B,C都在等待竞争该锁;
    
    2. B,C读取lock.id的值,比较当前时间和键lock.id的值来判断是否超时,发现超时;
    
    3. B执行del lock.id命令,并执行SETNX lock.id命令,并返回1,B获得锁;
    
    4. C由于刚刚检测到锁已超时,执行DEL lock.id命令,将B刚刚设置的键lock.id删除,执行SETNX lock.id命令,并返回1,
       即C获得锁。
    

    上面的步骤很明显出现了问题,导致B,C同时获取了锁。在检测超时后,线程不能直接简单的执行DEL删除键的操作已获得锁。

    对于上面的操作步骤进行改进,问题是出在删除键的操作上面,那么获取锁之后应该怎么改进呢?首先看一下Redis的getset这个操作,GETSET key value,将给定key的值设为value,并返回key的旧值(old value)。利用这个操作指令,我们改进一下上述步骤。

    1. A已经首先获得了锁lock.id,然后A断线。B,C都在等待竞争该锁;
    
    2. B,C读取lock.id的值,比较当前的时间和键lock.id的值来判断是否超时,发现超时;
    
    3. B检测到锁已超时,即当前的时间大于键lock.id的值,B执行
       GETSET lock.id <current Unix timestamp + lock timeout + 1> 
       设置时间戳,通过比较键lock.id的旧值是否小于当前时间,判断进程是否已获得锁;
    
    4. B发现GETSET返回的值小于当前时间,则执行DEL lock.id命令,并执行SETNX lock.id命令,并返回1,B获得锁;
    
    5. C执行GETSET得到的时间大于当前时间,则继续等待
    

    在线程释放锁,即执行DEL lock.id操作之前,需要判断锁是否已超时。如果锁已超时,那么所可能已有其他线程获得,这是直接执行DEL lock.id操作会导致把其他线程已获得的锁释放掉

    根据上面的逻辑,代码实现方式:基于StringRedisTemplate实现

    @Slf4j
    public class TestLock {
        
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        //添加锁方法
        public boolean lock(String key ,String value) throws InterruptedException {
            //相当于setnx方法,将key-value设置进去,如果存在值则不进行任何操作
            if (stringRedisTemplate.opsForValue().setIfAbsent(key,value)) {
                return true;
            }
            //获取当前key的value(获取当前key对应的过期时间)
            String currentValue = stringRedisTemplate.opsForValue().get(key);
            long currentValueLong = Long.parseLong(currentValue);
            // 判断当前key对应的value(过期时间),如果不为空并且小于当前时间,就说明上一个锁已经过期
            if (StringUtils.isNotEmpty(currentValue) && currentValueLong < System.currentTimeMillis()) {
                //执行GETSET方法将新的key和过期时间存储,并返回旧值(旧值指上一个key对应的value)
                String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);
                //判断旧值不为空,并且旧值等于上一个value,说明
                if (StringUtils.isNotEmpty(oldValue) && oldValue.equals(currentValue)) {
                    return true;
                }
            }
            return false;
        }
    
        //释放锁
        public void unLock(String key, String value) {
            try {
                // 获取当前redis中key对应值value
                String currentValue = stringRedisTemplate.opsForValue().get(key);
                //  校验当前值是否为空,且获取值是否与传递值一致
                if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
                    //  删除redis中key信息
                    stringRedisTemplate.opsForValue().getOperations().delete(key);
                }
            } catch (Exception e) {
                log.error("【redis分布式锁】减锁异常", e);
            }
        }
    }
    

    另外一种实现逻辑:

    1. A首先带着lock.id,过期时间和尝试时间通过setnx获取锁。随机生成一个值,作为key对应的value值,并设置key对应的
       过期时间,然后返回当前value,表示获取锁成功
    2. B,C读取lock.id的值,带着相应的过期时间和尝试时间,来竞争该锁。
    3. B,C都通过setnx方法尝试,如果B通过setnx返回1,那么说明A已经结束,B将设置对应的key和value,以及相应的过期时间。
       C则通过方法setnx返回0,并返回null,表示获取锁失败,继续等待
    

    具体代码实现如下:基于jedis实现

    @Slf4j
    public class RedisLock {
    
        private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
    
        private JedisPool jedisPool;
    
        public RedisLock(JedisPool jedisPool) {
            this.jedisPool = jedisPool;
        }
    
        public String lock(String key) {
            return this.lockWithTimeout(key, 300L, 3000L);
        }
    
    
        /***
         * redis实现分布式锁,入参key,acquireTimeout(尝试获取时间,单位:ms),expirationTime(key的到期时间,单位:ms)
         * 该方法使用的是Redis中的setnx方法来获取锁
         * */
        public String lockWithTimeout(String key, long acquireTimeout, long expirationTime) {
            Jedis conn = null;
            //返回对象
            Object retIdentifier = null;
    
            try {
                conn = this.jedisPool.getResource();
                String identifier = UUID.randomUUID().toString();
                //重新命名key
                String lockKey = "lock:" + key;
                //单位换算,由ms换算为s
                int lockExpire = (int)(expirationTime / 1000L);
                //计算重试的结束时间,如果入参acquireTimeout为0,尝试结束时间就是在当前时间加300ms,如果入参acquireTimeout不为0,尝试结束时间就是在当前时间加acquireTimeout,
                long end = System.currentTimeMillis() + (acquireTimeout == 0L ? 300L : acquireTimeout);
    
                //判断当前时间是否小于尝试结束时间
                while(System.currentTimeMillis() < end) {
                    //使用setnx方法判断是否有值,如果有值,则线程睡眠10ms,然后返回空,获取锁失败
                    if (conn.setnx(lockKey, identifier) == 1L) {
                        //设置当前key的有效期,单位秒
                        conn.expire(lockKey, lockExpire);
                        String var13 = identifier;
                        //返回当前value值,获取锁成功。
                        return var13;
                    }
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException var18) {
                        Thread.currentThread().interrupt();
                    }
                }
                return (String)retIdentifier;
            } catch (JedisException var19) {
                logger.error("JedisException:" + var19 + "\t key:" + key);
                return (String)retIdentifier;
            } finally {
                if (conn != null) {
                    conn.close();
                }
            }
        }
    
        //释放锁
        public boolean unLock(String key, String identifier) {
            Jedis conn = null;
            String lockKey = "lock:" + key;
            boolean retFlag = false;
            try {
                conn = this.jedisPool.getResource();
                if (identifier.equals(conn.get(lockKey))) {
                    conn.del(lockKey);
                    retFlag = true;
                }
            } catch (JedisException var10) {
                logger.error("JedisException:" + var10 + "\t key:" + key);
            } finally {
                if (conn != null) {
                    conn.close();
                }
    
            }
            return retFlag;
        }
    }
    
    

    原文参考

    相关文章

      网友评论

          本文标题:基于Redis的实现分布式锁(本文提供两种方式)

          本文链接:https://www.haomeiwen.com/subject/zxfzbqtx.html