1,为什么需要分布式锁?
在开发应用时,当多个客户或者多个线程需要对某个共享的数据进行操作时,就需要使用线程同步。在Java开发中,对于单机应用,因为是在同一个JVM内部,所以我们可以采用Java提供的各种多线程操作的技巧来实现线程同步。
而对于分布式系统来说,由于多个请求可能被分发到不同的机器上去处理,如果这多个请求都是对同一个资源进行操作,那么使用基本的Java多线程线程同步技术可能就解决不了这个问题。
image
如上图,请求A、B、C都是发起扣减同一个商品的库存操作,三个请求被分发到三台不同的服务部署机器上进行处理。而三台机器并不在同一个JVM,所以Java提供的线程同步技巧就发挥不了作用了。但是对于扣减库存这样的场景,必须要使用线程同步来保证同一个商品的库存不会被漏扣或者多扣。
为了保证在高并发的场景下,临界资源(共享资源)同时只能被一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。
但是在分布式系统中,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
2、分布式锁应该具备哪些条件?
在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
高可用的获取锁与释放锁;
高性能的获取锁与释放锁;
具备可重入特性;
具备锁失效机制,防止死锁;
具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
3、Spring Boot整合Redis实现简单的分布式锁
实现思想
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
delete key:删除key
Spring Boot整合Redis实现简单的分布式锁实现细节
我们以实现一个多个线程同步扣除同一个商品的库存为例,实现一个简单的Redis分布式锁。实例需要依赖的内容如下:
Spring Boot Web 依赖:通过在页面上点击实现多个请求;
Spring JPA:数据库访问;
MySQL:存储商品库存;
DROP TABLE IF EXISTS `goods_store`;
CREATE TABLE `goods_store` (
`code` varchar(255) NOT NULL,
`store` int(255) DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of goods_store
-- ----------------------------
INSERT INTO `goods_store` VALUES ('2019053016502800101', '1968', '2020-02-20 15:54:02');
html:页面模板;
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
依赖配置好之后,如果没有安装Redis和MySQL的,自己先安装好Redis和MySQL,
准备工作做好之后,在Spring Boot配置文件中配置Redis、MySQL的链接属性、Spring Boot应用端口、名称等。
yml配置:
spring:
application:
name: Redis Distribute Lock
redis:
host: 127.0.0.1
port: 6379
timeout: 20000
password: 654321
jedis:
pool:
max-active: 8
min-idle: 0
max-idle: 8
max-wait: -1
datasource:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=true&allowMultiQueries=true&serverTimezone=Asia/Hong_Kong
username: root
password: 111111
driver-class-name: com.mysql.jdbc.Driver
jpa:
show-sql: true
hibernate:
ddl-auto: none
server:
port: 8090
定义实体:
package com.taotao.redisson.entity;
import lombok.Data;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;
@Data
@Entity
@Table(name="goods_store")
public class GoodsStore implements Serializable{
private static final long serialVersionUID=1L;
@Id
private String code;
@Column(name="store")
private int store;
@Column(name="update_time")
private Date update_time;
}
实现Redis分布式锁
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 加锁
* @param lockKey 加锁的Key
* @param timeStamp 时间戳:当前时间+超时时间
* @return
*/
public boolean lock(String lockKey,String timeStamp){
if(stringRedisTemplate.opsForValue().setIfAbsent(lockKey, timeStamp)){
// 对应setnx命令,可以成功设置,也就是key不存在,获得锁成功
return true;
}
//设置失败,获得锁失败
// 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 ,防止死锁
String currentLock = stringRedisTemplate.opsForValue().get(lockKey);
// 如果锁过期 currentLock不为空且小于当前时间
if(!StringUtils.isEmpty(currentLock) && Long.parseLong(currentLock) < System.currentTimeMillis()){
//如果lockKey对应的锁已经存在,获取上一次设置的时间戳之后并重置lockKey对应的锁的时间戳
String preLock = stringRedisTemplate.opsForValue().getAndSet(lockKey, timeStamp);
//假设两个线程同时进来这里,因为key被占用了,而且锁过期了。
//获取的值currentLock=A(get取的旧的值肯定是一样的),两个线程的timeStamp都是B,key都是K.锁时间已经过期了。
//而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的timeStamp已经变成了B。
//只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
if(!StringUtils.isEmpty(preLock) && preLock.equals(currentLock)){
return true;
}
}
return false;
}
/**
* 释放锁
* @param lockKey
* @param timeStamp
*/
public void release(String lockKey,String timeStamp){
try {
String currentValue = stringRedisTemplate.opsForValue().get(lockKey);
if(!StringUtils.isEmpty(currentValue) && currentValue.equals(timeStamp) ){
// 删除锁状态
stringRedisTemplate.opsForValue().getOperations().delete(lockKey);
}
} catch (Exception e) {
System.out.println("警报!警报!警报!解锁异常");
}
}
}
创建库存Respository
package com.taotao.redisson.respository;
import com.taotao.redisson.entity.GoodsStore;
import io.lettuce.core.dynamic.annotation.Param;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import javax.transaction.Transactional;
/**
* 库存Respository
*
*/
public interface GoodsStoreResponsitory extends JpaRepository<GoodsStore,String>{
@Modifying
@Transactional
@Query("update GoodsStore gs set gs.store=gs.store-?2 where gs.code=?1")
int updateStore(@Param("code")String code,@Param("store")Integer store);
}
创建库存接口,定义更新库存和获取库存信息的方法。
package com.taotao.redisson.facade;
import com.taotao.redisson.entity.GoodsStore;
/**
* 创建库存的接口,定义更新库存和获取库存信息的方法
*/
public interface GoodsStoreFacade {
/**
* 根据产品编号更新库存
* @param code
* @param count
* @return
*/
String updateGoodsStore(String code,int count);
/**
* 获取库存的对象
* @param code
* @return
*/
GoodsStore getGoodsStore(String code);
}
实现更新库存接口
package com.taotao.redisson.service;
import com.taotao.redisson.entity.GoodsStore;
import com.taotao.redisson.facade.GoodsStoreFacade;
import com.taotao.redisson.lock.RedisLock;
import com.taotao.redisson.respository.GoodsStoreResponsitory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Optional;
/**
* 库存管理服务
* @author user
*
*/
@Service
public class GoodsStoreService implements GoodsStoreFacade{
@Autowired
private GoodsStoreRespository goodsStoreRespository;
@Autowired
private RedisLock redisLock;
/**
* 超时时间 5s
*/
private static final int TIMEOUT = 5*1000;
/**
* 根据产品编号更新库存
* @param code
* @return
*/
@Override
public String updateGoodsStore(String code,int count) {
//上锁
long time = System.currentTimeMillis() + TIMEOUT;
if(!redisLock.lock(code, String.valueOf(time))){
return "排队人数太多,请稍后再试.";
}
System.out.println("获得锁的时间戳:"+String.valueOf(time));
try {
GoodsStore goodsStore = getGoodsStore(code);
if(goodsStore != null){
if(goodsStore.getStore() <= 0){
return "对不起,卖完了,库存为:"+goodsStore.getStore();
}
if(goodsStore.getStore() < count){
return "对不起,库存不足,库存为:"+goodsStore.getStore()+" 您的购买数量为:"+count;
}
System.out.println("剩余库存:"+goodsStore.getStore());
System.out.println("扣除库存:"+count);
goodsStoreRespository.updateStore(code, count);
try{
//为了更好的测试多线程同时进行库存扣减,在进行数据更新之后先等1秒,让多个线程同时竞争资源
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
return "恭喜您,购买成功!";
}else{
return "获取库存失败。";
}
} finally {
//释放锁
redisLock.release(code, String.valueOf(time));
System.out.println("释放锁的时间戳:"+String.valueOf(time));
}
}
/**
* 获取库存对象
* @param code
* @return
*/
@Override
public GoodsStore getGoodsStore(String code){
Optional<GoodsStore> optional = goodsStoreRespository.findById(code);
return optional.get();
}
}
创建并实现测试的控制器
package com.taotao.redisson.controller;
import com.taotao.redisson.facade.GoodsStoreFacade;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/")
public class TestController {
@Autowired
private GoodsStoreFacade goodsStoreService;
/**
* 进入测试页面
* @param model
* @return
*/
@GetMapping("test")
public ModelAndView stepOne(Model model){
return new ModelAndView("test", "model", model);
}
/**
* 秒杀提交
* @param code
* @param num
* @return
*/
@PostMapping("secKill")
@ResponseBody
public String secKill(@RequestParam(value="code",required=true) String code,@RequestParam(value="num",required=true) Integer num){
String reString = goodsStoreService.updateGoodsStore(code, num);
return reString;
}
}
在static目录创建test.html,实现点击秒杀功能。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="js/jquery-3.4.1.min.js"></script>
</head>
<body>
<button id="btn_seckill">秒杀商品</button>
<div id="count_num"></div>
<div id="result"></div>
<script type="text/javascript">
var countNum = 0;
$("#btn_seckill").click(function(){
var json={"code":"2019053016502800101","num":1};
for(var i = 0 ; i < 400 ; i++){
$.post("secKill",json,function(data){
if(data != "排队人数太多,请稍后再试."){
$("#result").append("<br />" + data + "<br />");
}else{
$("#result").append(data + " ");
}
if(data.indexOf("恭喜您,购买成功") != -1){
countNum += 1;
}
$("#count_num").text("总共卖出:"+countNum);
});
}
});
</script>
</body>
</html>
网友评论