美文网首页我的微服务设计方案Redis
synchronized锁和redis分布式锁的使用

synchronized锁和redis分布式锁的使用

作者: 扮鬼之梦 | 来源:发表于2019-11-06 11:38 被阅读0次

准备工作

1.商品库存都以50为例存在redis中

image

2.商品购买接口

Controller

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @GetMapping("/buy")
    public String buy(){
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");

        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
        return "ok";
    }
}

3.使用jMeter进行压测

设置访问地址:http://localhost/lock-service/test/buy

image
设置并发数,这里是0s内发送200个请求。
image

测试

1.未加锁

image
image

结果:原库存为50,但成功卖出了200件商品,卖出200件商品后库存变为20,商品超卖了。

2.加synchronized锁

重点为锁对象的选取,这里使用商品id字符串,对应的常量池中的引用做为锁对象。

学习synchronized的时候一般都是用this做为锁对象的,这里如果使用this,购买不同商品时,也会争夺同一把锁,效率较低。

@GetMapping("/buy")
public String buy(){
    //使用id的字符串做为锁对象,intern方法 (返回常量池中该字符串的引用)
    String poductId = "100";
    String lock = poductId.intern();
    synchronized (lock.getClass()){
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");
        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
    }
    return "ok";
}
image
image

结果:原库存为50,成功卖出了50件商品,卖出50件商品后库存变为0,加锁成功

3.加synchronized锁,并启动多个商品服务

image

我这里启动两个商品购买服务,锁还是使用synchronized锁,使用SpringClouldGateway负载均衡调用商品服务。

商品服务实例1


image

商品服务实例2


image

结果:通过实例1和实例2的结果可以看出,至少36、35、22、18存在着重复卖出,商品超卖了。因为两个商品服务是启动在两个jvm中,synchronized无法实现跨虚拟机加锁,所以分布式系统中不能使用synchronized锁。

4.加Redis分布式锁,并启动多个商品服务

官方文档

a.引入依赖,SpringBoot版本自选

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>5.2.0.RELEASE</version>
</dependency>

b.yml配置

spring: 
  redis:
    database: 0 # Redis数据库索引(默认为0)
    host: 127.0.0.1
    port: 6379
    jedis:
      pool:
        max-active: 30 # 连接池最大连接数(使用负值表示没有限制)
        max-idle: 60 # 连接池中的最大空闲连接
        min-idle: 20 # 连接池中的最小空闲连接

c.注入Bean对象

@Bean
RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {
    return new RedisLockRegistry(connectionFactory, "redisLock", 5000L);
}

注 RedisLockRegistry 提供两个构造方法, 上述示例, 最后一个参数为 默认过期时间

To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry. Locks are normally held for a much smaller time.

d.使用Redis锁

@Autowired
private RedisLockRegistry redisLockRegistry;

@GetMapping("/buy")
public String buy(){
    String poductId = "100";
    Lock lock = redisLockRegistry.obtain("buyRedis:" + poductId);
    try{
        lock.lock();
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");

        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
    }finally {
        lock.unlock();
    }

    return "ok";
}

e.启动两个商品服务进行测试

因为我设置50个商品时,总是在某一个实例里卖完了所有商品(执行太快了),所以我把商品数量设成了200.
实例1


image

实例2


image
image.png

结果:原库存为200,成功卖出了200件商品,卖出200件商品后库存变为0,加分布式锁成功。

自己实现一个redis锁

1.代码

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @GetMapping("/buy")
    public String buy(){
        String lockKey = "productKey_100";

    //这个锁是阻塞型的锁且会自动增加锁的生存时间
        lock(lockKey);

        try {
        //设置购买操作的时间大于锁的默认生存时间9s,测试锁的续命效果
            Thread.sleep(12000L);
            buyProduct();
        }catch (Exception e){
            System.out.println("休眠错误");
        }finally {
            unlock(lockKey);
        }

        return "ok";
    }

    /**
     * 购买商品
     */
    private void buyProduct(){
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");

        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
    }

    /**
     * 加锁
     */
    private void lock(String lockKey){
        String uuid = UUID.randomUUID().toString();
        while(true){
            if(redisTemplate.opsForValue().setIfAbsent(lockKey,uuid , 9, TimeUnit.SECONDS)){
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        Timer timer = new Timer();

                        TimerTask timerTask = new TimerTask() {
                            @Override
                            public void run() {
                //锁存在则将生存时间重置为9s
                                String o =(String) redisTemplate.opsForValue().get(lockKey);
                                if(uuid.equals(o)){
                                    redisTemplate.expire(lockKey,9,TimeUnit.SECONDS);
                                }else{
                                    timer.cancel();
                                }
                            }
                        };
            //定时器启动3s后执行第一次,之后每隔3s执行一次
                        timer.schedule(timerTask,3000L,3000L);
                    }
                });

                thread.run();
                break;
            }
        }
    }

    /**
     * 解锁锁
     */
    private void unlock(String lockKey){
        redisTemplate.delete(lockKey);
    }
}

2.效果

设置30个库存,并发发送30个购买请求,执行了12*30秒,得到结果如下


image
image
image

30个商品都成功卖出,卖出后库存为0,加锁成功,锁的阻塞效果和锁的续命效果也成功了。

容易踩的坑

1.事务中加锁,会导致锁失效

原因:由于事务是加锁前开启的,锁是事务未提交前释放的,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。

解决方法:我们可以在@Transactional注释的方法之前就加上锁,在还没有开事务之前就加锁,那么就可以保证线程的安全性,从而不会出现脏读和数据不一致性等情况。

2.锁对象的选取

锁对象的选取要和实际业务关联,例如商品购买,就要和商品关联。一律使用当前类(this)做锁对象效率不高。

同时对于同一商品的锁对象要是相同对象(同一引用的对象)。

3.单实例服务可以使用非分布式的锁,多实例服务只能使用分布式锁

分布式锁的实现方案有很多

相关文章

  • Redis的基本使用(-) 分布式锁

    Redis的基本使用(-) 分布式锁 1、Redis做分布式锁 分布式锁是Redis较常见的使用场景。 问题场景:...

  • 使用JVM提高秒杀系统性能

    前提 使用redis分布式锁,解决秒杀系统库存为零 继续扣减问题 redis分布式锁出现的问题 使用redis锁,...

  • Redis分布式锁的实现

    Redis分布式锁的实现 参考 使用 Spring Boot AOP 实现 Web 日志处理和分布式锁 Redis...

  • 分布式锁

    为什么要用分布式锁 数据库乐观锁redis分布式锁zookeeper分布式锁 使用分布式锁的场景 实现分布式锁的方...

  • synchronized锁和redis分布式锁的使用

    《Redis分布式锁的使用》 准备工作 1.商品库存都以10为例 2.商品购买接口 Controller Serv...

  • Redis实现分布式锁

    Redis实现分布式锁 一、Redis单节点实现 (一) 获取锁 使用 Redis 客户端获取锁,向Redis发出...

  • Redis分布式锁

    1.Redis分布式锁概述 除了Redis,还能使用什么作为分布式锁? 利用Redis的setnx(SET if ...

  • 84 redis实现分布式锁的原理

    1,Redis使用setnx 实现2,Redisson 分布式锁;Redis基于 setnx 实现分布式锁原理:R...

  • 大佬浅谈分布式锁

    redis 实现 redis 分布锁一、redis 实现分布式锁(可重入锁)redission 实现分布式锁1、对...

  • 分布式锁用Redis 还是Zookeeper

    本文主要包括为什么使用分布式锁以及使用Redis 作为分布式锁,涉及到redis的模式、redisson、redl...

网友评论

    本文标题:synchronized锁和redis分布式锁的使用

    本文链接:https://www.haomeiwen.com/subject/zlcubctx.html