美文网首页我的微服务设计方案Redis
synchronized锁和redis分布式锁的使用

synchronized锁和redis分布式锁的使用

作者: 扮鬼之梦 | 来源:发表于2019-11-06 11:38 被阅读0次

    准备工作

    1.商品库存都以50为例存在redis中

    image

    2.商品购买接口

    Controller

    @RestController
    @RequestMapping("/test")
    public class TestController {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @GetMapping("/buy")
        public String buy(){
            //查询当前商品库存
            Integer store = (Integer) redisTemplate.opsForValue().get("store");
    
            if(store > 0){
                //库存不为0则库存减一
                store =  store - 1;
                redisTemplate.opsForValue().set("store",store);
                System.out.println("购买成功,购买后库存为:"+ store);
            }else{
                System.out.println("购买失败");
            }
            return "ok";
        }
    }
    

    3.使用jMeter进行压测

    设置访问地址:http://localhost/lock-service/test/buy

    image
    设置并发数,这里是0s内发送200个请求。
    image

    测试

    1.未加锁

    image
    image

    结果:原库存为50,但成功卖出了200件商品,卖出200件商品后库存变为20,商品超卖了。

    2.加synchronized锁

    重点为锁对象的选取,这里使用商品id字符串,对应的常量池中的引用做为锁对象。

    学习synchronized的时候一般都是用this做为锁对象的,这里如果使用this,购买不同商品时,也会争夺同一把锁,效率较低。

    @GetMapping("/buy")
    public String buy(){
        //使用id的字符串做为锁对象,intern方法 (返回常量池中该字符串的引用)
        String poductId = "100";
        String lock = poductId.intern();
        synchronized (lock.getClass()){
            //查询当前商品库存
            Integer store = (Integer) redisTemplate.opsForValue().get("store");
            if(store > 0){
                //库存不为0则库存减一
                store =  store - 1;
                redisTemplate.opsForValue().set("store",store);
                System.out.println("购买成功,购买后库存为:"+ store);
            }else{
                System.out.println("购买失败");
            }
        }
        return "ok";
    }
    
    image
    image

    结果:原库存为50,成功卖出了50件商品,卖出50件商品后库存变为0,加锁成功

    3.加synchronized锁,并启动多个商品服务

    image

    我这里启动两个商品购买服务,锁还是使用synchronized锁,使用SpringClouldGateway负载均衡调用商品服务。

    商品服务实例1


    image

    商品服务实例2


    image

    结果:通过实例1和实例2的结果可以看出,至少36、35、22、18存在着重复卖出,商品超卖了。因为两个商品服务是启动在两个jvm中,synchronized无法实现跨虚拟机加锁,所以分布式系统中不能使用synchronized锁。

    4.加Redis分布式锁,并启动多个商品服务

    官方文档

    a.引入依赖,SpringBoot版本自选

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-redis</artifactId>
        <version>5.2.0.RELEASE</version>
    </dependency>
    

    b.yml配置

    spring: 
      redis:
        database: 0 # Redis数据库索引(默认为0)
        host: 127.0.0.1
        port: 6379
        jedis:
          pool:
            max-active: 30 # 连接池最大连接数(使用负值表示没有限制)
            max-idle: 60 # 连接池中的最大空闲连接
            min-idle: 20 # 连接池中的最小空闲连接
    

    c.注入Bean对象

    @Bean
    RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {
        return new RedisLockRegistry(connectionFactory, "redisLock", 5000L);
    }
    

    注 RedisLockRegistry 提供两个构造方法, 上述示例, 最后一个参数为 默认过期时间

    To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry. Locks are normally held for a much smaller time.

    d.使用Redis锁

    @Autowired
    private RedisLockRegistry redisLockRegistry;
    
    @GetMapping("/buy")
    public String buy(){
        String poductId = "100";
        Lock lock = redisLockRegistry.obtain("buyRedis:" + poductId);
        try{
            lock.lock();
            //查询当前商品库存
            Integer store = (Integer) redisTemplate.opsForValue().get("store");
    
            if(store > 0){
                //库存不为0则库存减一
                store =  store - 1;
                redisTemplate.opsForValue().set("store",store);
                System.out.println("购买成功,购买后库存为:"+ store);
            }else{
                System.out.println("购买失败");
            }
        }finally {
            lock.unlock();
        }
    
        return "ok";
    }
    

    e.启动两个商品服务进行测试

    因为我设置50个商品时,总是在某一个实例里卖完了所有商品(执行太快了),所以我把商品数量设成了200.
    实例1


    image

    实例2


    image
    image.png

    结果:原库存为200,成功卖出了200件商品,卖出200件商品后库存变为0,加分布式锁成功。

    自己实现一个redis锁

    1.代码

    @RestController
    @RequestMapping("/test")
    public class TestController {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @GetMapping("/buy")
        public String buy(){
            String lockKey = "productKey_100";
    
        //这个锁是阻塞型的锁且会自动增加锁的生存时间
            lock(lockKey);
    
            try {
            //设置购买操作的时间大于锁的默认生存时间9s,测试锁的续命效果
                Thread.sleep(12000L);
                buyProduct();
            }catch (Exception e){
                System.out.println("休眠错误");
            }finally {
                unlock(lockKey);
            }
    
            return "ok";
        }
    
        /**
         * 购买商品
         */
        private void buyProduct(){
            //查询当前商品库存
            Integer store = (Integer) redisTemplate.opsForValue().get("store");
    
            if(store > 0){
                //库存不为0则库存减一
                store =  store - 1;
                redisTemplate.opsForValue().set("store",store);
                System.out.println("购买成功,购买后库存为:"+ store);
            }else{
                System.out.println("购买失败");
            }
        }
    
        /**
         * 加锁
         */
        private void lock(String lockKey){
            String uuid = UUID.randomUUID().toString();
            while(true){
                if(redisTemplate.opsForValue().setIfAbsent(lockKey,uuid , 9, TimeUnit.SECONDS)){
                    Thread thread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            Timer timer = new Timer();
    
                            TimerTask timerTask = new TimerTask() {
                                @Override
                                public void run() {
                    //锁存在则将生存时间重置为9s
                                    String o =(String) redisTemplate.opsForValue().get(lockKey);
                                    if(uuid.equals(o)){
                                        redisTemplate.expire(lockKey,9,TimeUnit.SECONDS);
                                    }else{
                                        timer.cancel();
                                    }
                                }
                            };
                //定时器启动3s后执行第一次,之后每隔3s执行一次
                            timer.schedule(timerTask,3000L,3000L);
                        }
                    });
    
                    thread.run();
                    break;
                }
            }
        }
    
        /**
         * 解锁锁
         */
        private void unlock(String lockKey){
            redisTemplate.delete(lockKey);
        }
    }
    

    2.效果

    设置30个库存,并发发送30个购买请求,执行了12*30秒,得到结果如下


    image
    image
    image

    30个商品都成功卖出,卖出后库存为0,加锁成功,锁的阻塞效果和锁的续命效果也成功了。

    容易踩的坑

    1.事务中加锁,会导致锁失效

    原因:由于事务是加锁前开启的,锁是事务未提交前释放的,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。

    解决方法:我们可以在@Transactional注释的方法之前就加上锁,在还没有开事务之前就加锁,那么就可以保证线程的安全性,从而不会出现脏读和数据不一致性等情况。

    2.锁对象的选取

    锁对象的选取要和实际业务关联,例如商品购买,就要和商品关联。一律使用当前类(this)做锁对象效率不高。

    同时对于同一商品的锁对象要是相同对象(同一引用的对象)。

    3.单实例服务可以使用非分布式的锁,多实例服务只能使用分布式锁

    分布式锁的实现方案有很多

    相关文章

      网友评论

        本文标题:synchronized锁和redis分布式锁的使用

        本文链接:https://www.haomeiwen.com/subject/zlcubctx.html