美文网首页
redis分布式锁api

redis分布式锁api

作者: wang_cheng | 来源:发表于2020-07-22 14:16 被阅读0次
分布式锁需要解决的问题
  • 互斥性 (任意时刻只能一个客户端获取锁,防止互相干扰)
  • 安全性 (锁只能被持有锁的客户端删除,'解铃还须系铃人')
  • 死锁 (获取锁的客户端因为某些原因(:突然宕机)而未能释放锁,锁就一直锁着,其他客户端无法获取该锁,需要有机制避免这种情况发生)
  • 容错 (当部分节点,如redis部分节点宕机的时候,客户端能够获取锁和释放锁)

SET key value ([EX seconds] | [PX milliseconds ] )[NX|XX]

  • key :定义一个业务key
  • value : 唯一的id,保证加锁,解锁的唯一标识(一般可用uuid)
  • EX seconds :设置键的过期时间为seconds秒
  • PX : 设置键的过期时间 毫秒
  • NX: 只有键不存在时,才对键进行设置操作 (key存在,创建失败,不存在创建成功)
  • XX : 只有键存在时,才对键进行设置操作

一般使用

 public void test(){
        String requestId = redisService.lockRetry("锁的Key",避免死锁设置过期时间);
        if (requestId == null){
            //获取锁异常
            return;
        }
        try {
            //执行service业务
        }catch (Exception e){

        }finally {
            redisService.releaseDistributedLock("锁的key",requestId);
        }
    }


@Slf4j
@Component
public class RedisService<E>  {

    @Autowired
    protected JedisCluster jedis;
    
    /**
     * Only set the key if it does not already exist.
     * 当key不存在时设置成功,还有XX key存在时设置成功(取反)
     */
    private static final String SET_IF_NOT_EXIST = "NX";
    /**
     * Set the specified expire time, in milliseconds.
     * 单位:毫秒
     */
    private static final String SET_WITH_EXPIRE_MILL_TIME = "PX";
    /**
     * Set the specified expire time, in seconds.
     * 单位:秒
     */
    private static final String SET_WITH_EXPIRE_SECOND_TIME = "EX";
    private static final String LOCK_SUCCESS = "OK";
    private static final Long RELEASE_SUCCESS = 1L;


    /**
     * 默认锁1分钟,根据业务灵活释放
     **/
    private static final Long LOCK_TIME = 60000L;

    private static final String SHOES_CARD_LOCK_KEY= "bsd:shoes:shoes:card:exchange:{userId}";


    public String lockRetry(Integer userId){
        String key = RedisKey.replaceKey(SHOES_CARD_LOCK_KEY, userId);
        return lockRetry(key);
    }

    public void unLock(Integer userId,String requestId){
        String key = RedisKey.replaceKey(SHOES_CARD_LOCK_KEY, userId);
        releaseDistributedLock(key,requestId);
    };
    /**
     *
     * @author wangcheng
     * @date 2020/07/29
     * @param key
     *      默认锁释放时间1分钟
     *      默认重试次数3次
     * @return: java.lang.String
    **/
    public String lockRetry(String key){
        boolean flag ;
        String requestId = getRequestId();
        try {
            for (int i=0;i<3;i++){
                flag = tryGetDistributeLock(key,requestId,LOCK_TIME);
                if(flag){
                    return requestId;
                }
                Thread.sleep(100);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    private String getRequestId() {
        return UUID.randomUUID().toString();
    }

    /**
     * 设置分布式锁.(过期时间毫秒)
     *
     * @param lockKey    锁的名称.
     * @param requestId  持锁人ID.
     * @param expireTime 锁的超时时间.
     * @return 是否获取锁.
     */
    public synchronized boolean tryGetDistributeLock(String lockKey, String requestId, long expireTime) {
        log.debug("lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId, expireTime);
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_MILL_TIME, expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

    /**
     * 释放分布式锁.
     *
     * @param lockKey   锁
     * @param requestId 请求标识
     * @return 是否释放成功.
     */
    public synchronized boolean releaseDistributedLock(String lockKey, String requestId) {
        log.debug("release lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId);
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

}

redis常用的api封装
package com.imooc.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

/**
 * @author wangcheng
 * @date 2020/7/22
 */
public class RedisService {




    @Autowired
    protected JedisCluster jedis;

    private ObjectMapper objectMapper = new ObjectMapper();


    /**
     * 返回集合成员,其中的集合成员位置按score值递增(从小到大)来排序。.
     *
     * @param key   交集结果的存储的key
     * @param start 元素开始位置
     * @param end   元素结束位置
     * @return 返回指定位置范围的集合成员, 且成员按sore值递增排序.
     */
    public List<String> zrangeWithScores(String key, long start, long end) {
        log.info("key = {}, start = {}, end = {}", key, start, end);
        Set<Tuple> tupleSet = jedis.zrangeWithScores(key, start, end);
        Iterator<Tuple> iterator = tupleSet.iterator();
        List<String> retList = new ArrayList<>();
        while (iterator.hasNext()) {
            Tuple tuple = iterator.next();
            retList.add(tuple.getElement());
        }
        return retList;
    }

    /**
     * 返回集合成员,其中的集合成员位置按score值递增(从大到小)来排序。.
     *
     * @param key   交集结果的存储的key
     * @param start 元素开始位置
     * @param end   元素结束位置
     * @return 返回指定位置范围的集合成员, 且成员按sore值递减排序.
     */
    public List<String> zrevrangeWithScores(String key, long start, long end) {
        log.info("key = {}, start = {}, end = {}", key, start, end);
        Set<Tuple> tupleSet = jedis.zrevrangeWithScores(key, start, end);
        Iterator<Tuple> iterator = tupleSet.iterator();
        List<String> retList = new ArrayList<>();
        while (iterator.hasNext()) {
            Tuple tuple = iterator.next();
            retList.add(tuple.getElement());
        }
        return retList;
    }

    /**
     * 如果该key对应的值是一个Hash表,则返回对应字段的值。 如果不存在该字段,或者key不存在,则返回null
     *
     * @param key
     * @param field
     * @return
     */
    public String hget(String key, String field) {
        log.debug("key:{},field:{}", key, field);
        String value = this.jedis.hget(key, field);
        log.debug("key:{},field:{},value:{}", key, field, value);
        return value == null || "nil".equals(value) ? null : value;
    }

    /**
     * Only set the key if it does not already exist.只有键不存在时,才对键进行设置操作
     */
    private static final String SET_IF_NOT_EXIST = "NX";
    /**
     * Set the specified expire time, in milliseconds.设置时间为毫秒
     */
    private static final String SET_WITH_EXPIRE_MILL_TIME = "PX"; 
    /**
     * Set the specified expire time, in seconds.设置时间为秒
     */
    private static final String SET_WITH_EXPIRE_SECOND_TIME = "EX";
    private static final String LOCK_SUCCESS = "OK";
    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 默认锁3分钟
     **/
    private static final Long LOCK_TIME = 180000L;

    /**
     * 设置分布式锁.(过期时间毫秒)
     *
     * @param lockKey    锁的名称.
     * @param requestId  持锁人ID.
     * @param expireTime 锁的超时时间.
     * @return 是否获取锁.
     */
    public boolean tryGetDistributeLock(String lockKey, String requestId, long expireTime) {
        log.debug("lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId, expireTime);
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_MILL_TIME, expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

    /**
     * 释放分布式锁.
     *
     * @param lockKey   锁
     * @param requestId 请求标识
     * @return 是否释放成功.
     */
    public boolean releaseDistributedLock(String lockKey, String requestId) {
        log.debug("release lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId);
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

    public JedisCluster getJedis() {
        return jedis;
    }

    /**
     * 对list进行分页封装.
     * 不支持循环查询.
     *
     * @param key
     * @param pageNo
     * @param pageSize
     * @return
     */
    public List<String> list(String key, Integer pageNo, Integer pageSize) {
        Long length = jedis.llen(key);
        int size = pageNo * pageSize;
        int remainSize = length.intValue() - size;
        if (remainSize >= 0) {
            return jedis.lrange(key, (pageNo - 1) * pageSize, size - 1);
        } else {
            return jedis.lrange(key, (pageNo - 1) * pageSize, length - 1);
        }
    }

    /**
     * 支持分页查询redis中的set值.
     * 不支持循环查询.
     *
     * @param key
     * @param pageNo
     * @param pageSize
     * @return
     */
    public Set<String> zSet(String key, Integer pageNo, Integer pageSize) {
        Long length = jedis.zcard(key);
        int size = pageNo * pageSize;
        int remainSize = length.intValue() - size;
        if (remainSize >= 0) {
            return jedis.zrange(key, (pageNo - 1) * pageSize, size - 1);
        } else {
            return jedis.zrange(key, (pageNo - 1) * pageSize, length - 1);
        }
    }

    /**
     * 存储list数据到redis中.
     *
     * @param key
     * @param collection
     * @param expireSec
     * @return
     */
    public boolean saveListJson(String key, Collection<E> collection, Integer expireSec) {
        if (!CollectionUtils.isEmpty(collection)) {
            Iterator iterator = collection.iterator();
            while (iterator.hasNext()) {
                Object obj = iterator.next();
                try {
                    jedis.lpush(key, objectMapper.writeValueAsString(obj));
                } catch (JsonProcessingException e) {
                    log.error("{}", e.getMessage());
                }
            }
            jedis.expire(key, expireSec);
            return true;
        }
        return false;
    }

    /**
     * 存储set集合到redis.
     *
     * @param key
     * @param collection
     * @param expireSec
     * @return
     */
    public boolean saveZSetJson(String key, Collection<E> collection, String scoreName, Integer expireSec) {
        if (!CollectionUtils.isEmpty(collection)) {
            Iterator iterator = collection.iterator();
            while (iterator.hasNext()) {
                Object obj = iterator.next();
                double score = 0d;
                try {
                    Field field = obj.getClass().getDeclaredField(scoreName);
                    field.setAccessible(true);
                    Object value = field.get(obj);
                    score = Double.valueOf(value.toString());
                } catch (NoSuchFieldException | IllegalAccessException e) {
                    log.error("{}", e.getMessage());
                }
                try {
                    jedis.zadd(key, score, objectMapper.writeValueAsString(obj));
                } catch (JsonProcessingException e) {
                    log.error("{}", e.getMessage());
                }
            }
            jedis.expire(key, expireSec);
            return true;
        }
        return false;
    }


    /**
     * 对list进行分页封装.
     * 支持循环查询.
     *
     * @param key
     * @param pageNo
     * @param pageSize
     * @return
     */
    public List<String> listLoop(String key, Integer pageNo, Integer pageSize) {
        Long length = jedis.llen(key);
        int size = pageNo * pageSize;
        return null;
    }

    /**
     * 获取指定key的jedis.
     *
     * @param key
     */
    private Pipeline getJedisPipeline(String key) {
        JedisClusterPipeline jp = (JedisClusterPipeline) jedis;
        Jedis jedit = jp.getJedit(key);
        return jedit.pipelined();
    }

    /**
     * 使用pipeline批量获取数据.
     *
     * @param keys
     * @return
     */
    public List<String> batchGet(String... keys) {
        List<Pipeline> pipelines = Stream.of(keys).map(key -> {
            Pipeline jedisPipeline = getJedisPipeline(key);
            jedisPipeline.get(key);
            return jedisPipeline;
        }).collect(toList());
        return pipelines.stream().flatMap(pipeline -> {
            CompletableFuture<List<Object>> future = CompletableFuture.supplyAsync(() -> pipeline.syncAndReturnAll());
            try {
                return future.get(1, TimeUnit.SECONDS).stream();
            } catch (Exception e) {
                log.error("{}", e.getMessage());
            }
            return null;
        }).map(Object::toString).collect(Collectors.toList());
    }

    /**
     * 分布式锁唯一标识
     * @return: java.lang.String
     **/
    public static String getRequestId(){
        return UUID.randomUUID().toString();
    }

    /**
     * 重试机制
     * @autor wangcheng
     * @param key 锁标识
     * @param requestId 客户端标识
     * @param timeOut 过期时间
     * @param retry 重试次数 默认3次
     * @return
     */
    public String lockRetry(String key){
        Boolean flag = false;
        String requestId = getRequestId();
        try {
            for (int i=0;i<3;i++){
                flag = tryGetDistributeLock(key,requestId,LOCK_TIME);
                if(flag){
                    return requestId;
                }
                Thread.sleep(100);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

}

  1. 如果A客户端进来由于某些原因,执行时间过长,超过了锁的过期时间,此时B客户端也能获取锁,此时进来也会攻破

  2. 如果高并发1秒百千的请求过来如何优化,因为加锁是串行化的,加入一个业务处理20ms,
    1000个请求1秒只能请求50个,有没有优化的方法?

相关文章

网友评论

      本文标题:redis分布式锁api

      本文链接:https://www.haomeiwen.com/subject/acozkktx.html