分布式锁需要解决的问题
- 互斥性 (任意时刻只能一个客户端获取锁,防止互相干扰)
- 安全性 (锁只能被持有锁的客户端删除,'解铃还须系铃人')
- 死锁 (获取锁的客户端因为某些原因(:突然宕机)而未能释放锁,锁就一直锁着,其他客户端无法获取该锁,需要有机制避免这种情况发生)
- 容错 (当部分节点,如redis部分节点宕机的时候,客户端能够获取锁和释放锁)
SET key value ([EX seconds] | [PX milliseconds ] )[NX|XX]
- key :定义一个业务key
- value : 唯一的id,保证加锁,解锁的唯一标识(一般可用uuid)
- EX seconds :设置键的过期时间为seconds秒
- PX : 设置键的过期时间 毫秒
- NX: 只有键不存在时,才对键进行设置操作 (key存在,创建失败,不存在创建成功)
- XX : 只有键存在时,才对键进行设置操作
一般使用
public void test(){
String requestId = redisService.lockRetry("锁的Key",避免死锁设置过期时间);
if (requestId == null){
//获取锁异常
return;
}
try {
//执行service业务
}catch (Exception e){
}finally {
redisService.releaseDistributedLock("锁的key",requestId);
}
}
@Slf4j
@Component
public class RedisService<E> {
@Autowired
protected JedisCluster jedis;
/**
* Only set the key if it does not already exist.
* 当key不存在时设置成功,还有XX key存在时设置成功(取反)
*/
private static final String SET_IF_NOT_EXIST = "NX";
/**
* Set the specified expire time, in milliseconds.
* 单位:毫秒
*/
private static final String SET_WITH_EXPIRE_MILL_TIME = "PX";
/**
* Set the specified expire time, in seconds.
* 单位:秒
*/
private static final String SET_WITH_EXPIRE_SECOND_TIME = "EX";
private static final String LOCK_SUCCESS = "OK";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 默认锁1分钟,根据业务灵活释放
**/
private static final Long LOCK_TIME = 60000L;
private static final String SHOES_CARD_LOCK_KEY= "bsd:shoes:shoes:card:exchange:{userId}";
public String lockRetry(Integer userId){
String key = RedisKey.replaceKey(SHOES_CARD_LOCK_KEY, userId);
return lockRetry(key);
}
public void unLock(Integer userId,String requestId){
String key = RedisKey.replaceKey(SHOES_CARD_LOCK_KEY, userId);
releaseDistributedLock(key,requestId);
};
/**
*
* @author wangcheng
* @date 2020/07/29
* @param key
* 默认锁释放时间1分钟
* 默认重试次数3次
* @return: java.lang.String
**/
public String lockRetry(String key){
boolean flag ;
String requestId = getRequestId();
try {
for (int i=0;i<3;i++){
flag = tryGetDistributeLock(key,requestId,LOCK_TIME);
if(flag){
return requestId;
}
Thread.sleep(100);
}
}catch (Exception e){
e.printStackTrace();
}
return null;
}
private String getRequestId() {
return UUID.randomUUID().toString();
}
/**
* 设置分布式锁.(过期时间毫秒)
*
* @param lockKey 锁的名称.
* @param requestId 持锁人ID.
* @param expireTime 锁的超时时间.
* @return 是否获取锁.
*/
public synchronized boolean tryGetDistributeLock(String lockKey, String requestId, long expireTime) {
log.debug("lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId, expireTime);
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_MILL_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁.
*
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功.
*/
public synchronized boolean releaseDistributedLock(String lockKey, String requestId) {
log.debug("release lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId);
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
redis常用的api封装
package com.imooc.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
/**
* @author wangcheng
* @date 2020/7/22
*/
public class RedisService {
@Autowired
protected JedisCluster jedis;
private ObjectMapper objectMapper = new ObjectMapper();
/**
* 返回集合成员,其中的集合成员位置按score值递增(从小到大)来排序。.
*
* @param key 交集结果的存储的key
* @param start 元素开始位置
* @param end 元素结束位置
* @return 返回指定位置范围的集合成员, 且成员按sore值递增排序.
*/
public List<String> zrangeWithScores(String key, long start, long end) {
log.info("key = {}, start = {}, end = {}", key, start, end);
Set<Tuple> tupleSet = jedis.zrangeWithScores(key, start, end);
Iterator<Tuple> iterator = tupleSet.iterator();
List<String> retList = new ArrayList<>();
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
retList.add(tuple.getElement());
}
return retList;
}
/**
* 返回集合成员,其中的集合成员位置按score值递增(从大到小)来排序。.
*
* @param key 交集结果的存储的key
* @param start 元素开始位置
* @param end 元素结束位置
* @return 返回指定位置范围的集合成员, 且成员按sore值递减排序.
*/
public List<String> zrevrangeWithScores(String key, long start, long end) {
log.info("key = {}, start = {}, end = {}", key, start, end);
Set<Tuple> tupleSet = jedis.zrevrangeWithScores(key, start, end);
Iterator<Tuple> iterator = tupleSet.iterator();
List<String> retList = new ArrayList<>();
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
retList.add(tuple.getElement());
}
return retList;
}
/**
* 如果该key对应的值是一个Hash表,则返回对应字段的值。 如果不存在该字段,或者key不存在,则返回null
*
* @param key
* @param field
* @return
*/
public String hget(String key, String field) {
log.debug("key:{},field:{}", key, field);
String value = this.jedis.hget(key, field);
log.debug("key:{},field:{},value:{}", key, field, value);
return value == null || "nil".equals(value) ? null : value;
}
/**
* Only set the key if it does not already exist.只有键不存在时,才对键进行设置操作
*/
private static final String SET_IF_NOT_EXIST = "NX";
/**
* Set the specified expire time, in milliseconds.设置时间为毫秒
*/
private static final String SET_WITH_EXPIRE_MILL_TIME = "PX";
/**
* Set the specified expire time, in seconds.设置时间为秒
*/
private static final String SET_WITH_EXPIRE_SECOND_TIME = "EX";
private static final String LOCK_SUCCESS = "OK";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 默认锁3分钟
**/
private static final Long LOCK_TIME = 180000L;
/**
* 设置分布式锁.(过期时间毫秒)
*
* @param lockKey 锁的名称.
* @param requestId 持锁人ID.
* @param expireTime 锁的超时时间.
* @return 是否获取锁.
*/
public boolean tryGetDistributeLock(String lockKey, String requestId, long expireTime) {
log.debug("lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId, expireTime);
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_MILL_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁.
*
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功.
*/
public boolean releaseDistributedLock(String lockKey, String requestId) {
log.debug("release lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId);
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
public JedisCluster getJedis() {
return jedis;
}
/**
* 对list进行分页封装.
* 不支持循环查询.
*
* @param key
* @param pageNo
* @param pageSize
* @return
*/
public List<String> list(String key, Integer pageNo, Integer pageSize) {
Long length = jedis.llen(key);
int size = pageNo * pageSize;
int remainSize = length.intValue() - size;
if (remainSize >= 0) {
return jedis.lrange(key, (pageNo - 1) * pageSize, size - 1);
} else {
return jedis.lrange(key, (pageNo - 1) * pageSize, length - 1);
}
}
/**
* 支持分页查询redis中的set值.
* 不支持循环查询.
*
* @param key
* @param pageNo
* @param pageSize
* @return
*/
public Set<String> zSet(String key, Integer pageNo, Integer pageSize) {
Long length = jedis.zcard(key);
int size = pageNo * pageSize;
int remainSize = length.intValue() - size;
if (remainSize >= 0) {
return jedis.zrange(key, (pageNo - 1) * pageSize, size - 1);
} else {
return jedis.zrange(key, (pageNo - 1) * pageSize, length - 1);
}
}
/**
* 存储list数据到redis中.
*
* @param key
* @param collection
* @param expireSec
* @return
*/
public boolean saveListJson(String key, Collection<E> collection, Integer expireSec) {
if (!CollectionUtils.isEmpty(collection)) {
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
Object obj = iterator.next();
try {
jedis.lpush(key, objectMapper.writeValueAsString(obj));
} catch (JsonProcessingException e) {
log.error("{}", e.getMessage());
}
}
jedis.expire(key, expireSec);
return true;
}
return false;
}
/**
* 存储set集合到redis.
*
* @param key
* @param collection
* @param expireSec
* @return
*/
public boolean saveZSetJson(String key, Collection<E> collection, String scoreName, Integer expireSec) {
if (!CollectionUtils.isEmpty(collection)) {
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
Object obj = iterator.next();
double score = 0d;
try {
Field field = obj.getClass().getDeclaredField(scoreName);
field.setAccessible(true);
Object value = field.get(obj);
score = Double.valueOf(value.toString());
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error("{}", e.getMessage());
}
try {
jedis.zadd(key, score, objectMapper.writeValueAsString(obj));
} catch (JsonProcessingException e) {
log.error("{}", e.getMessage());
}
}
jedis.expire(key, expireSec);
return true;
}
return false;
}
/**
* 对list进行分页封装.
* 支持循环查询.
*
* @param key
* @param pageNo
* @param pageSize
* @return
*/
public List<String> listLoop(String key, Integer pageNo, Integer pageSize) {
Long length = jedis.llen(key);
int size = pageNo * pageSize;
return null;
}
/**
* 获取指定key的jedis.
*
* @param key
*/
private Pipeline getJedisPipeline(String key) {
JedisClusterPipeline jp = (JedisClusterPipeline) jedis;
Jedis jedit = jp.getJedit(key);
return jedit.pipelined();
}
/**
* 使用pipeline批量获取数据.
*
* @param keys
* @return
*/
public List<String> batchGet(String... keys) {
List<Pipeline> pipelines = Stream.of(keys).map(key -> {
Pipeline jedisPipeline = getJedisPipeline(key);
jedisPipeline.get(key);
return jedisPipeline;
}).collect(toList());
return pipelines.stream().flatMap(pipeline -> {
CompletableFuture<List<Object>> future = CompletableFuture.supplyAsync(() -> pipeline.syncAndReturnAll());
try {
return future.get(1, TimeUnit.SECONDS).stream();
} catch (Exception e) {
log.error("{}", e.getMessage());
}
return null;
}).map(Object::toString).collect(Collectors.toList());
}
/**
* 分布式锁唯一标识
* @return: java.lang.String
**/
public static String getRequestId(){
return UUID.randomUUID().toString();
}
/**
* 重试机制
* @autor wangcheng
* @param key 锁标识
* @param requestId 客户端标识
* @param timeOut 过期时间
* @param retry 重试次数 默认3次
* @return
*/
public String lockRetry(String key){
Boolean flag = false;
String requestId = getRequestId();
try {
for (int i=0;i<3;i++){
flag = tryGetDistributeLock(key,requestId,LOCK_TIME);
if(flag){
return requestId;
}
Thread.sleep(100);
}
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
-
如果A客户端进来由于某些原因,执行时间过长,超过了锁的过期时间,此时B客户端也能获取锁,此时进来也会攻破
-
如果高并发1秒百千的请求过来如何优化,因为加锁是串行化的,加入一个业务处理20ms,
1000个请求1秒只能请求50个,有没有优化的方法?
网友评论