美文网首页
redis分布式锁api

redis分布式锁api

作者: wang_cheng | 来源:发表于2020-07-22 14:16 被阅读0次
    分布式锁需要解决的问题
    • 互斥性 (任意时刻只能一个客户端获取锁,防止互相干扰)
    • 安全性 (锁只能被持有锁的客户端删除,'解铃还须系铃人')
    • 死锁 (获取锁的客户端因为某些原因(:突然宕机)而未能释放锁,锁就一直锁着,其他客户端无法获取该锁,需要有机制避免这种情况发生)
    • 容错 (当部分节点,如redis部分节点宕机的时候,客户端能够获取锁和释放锁)

    SET key value ([EX seconds] | [PX milliseconds ] )[NX|XX]

    • key :定义一个业务key
    • value : 唯一的id,保证加锁,解锁的唯一标识(一般可用uuid)
    • EX seconds :设置键的过期时间为seconds秒
    • PX : 设置键的过期时间 毫秒
    • NX: 只有键不存在时,才对键进行设置操作 (key存在,创建失败,不存在创建成功)
    • XX : 只有键存在时,才对键进行设置操作

    一般使用

     public void test(){
            String requestId = redisService.lockRetry("锁的Key",避免死锁设置过期时间);
            if (requestId == null){
                //获取锁异常
                return;
            }
            try {
                //执行service业务
            }catch (Exception e){
    
            }finally {
                redisService.releaseDistributedLock("锁的key",requestId);
            }
        }
    
    
    
    @Slf4j
    @Component
    public class RedisService<E>  {
    
        @Autowired
        protected JedisCluster jedis;
        
        /**
         * Only set the key if it does not already exist.
         * 当key不存在时设置成功,还有XX key存在时设置成功(取反)
         */
        private static final String SET_IF_NOT_EXIST = "NX";
        /**
         * Set the specified expire time, in milliseconds.
         * 单位:毫秒
         */
        private static final String SET_WITH_EXPIRE_MILL_TIME = "PX";
        /**
         * Set the specified expire time, in seconds.
         * 单位:秒
         */
        private static final String SET_WITH_EXPIRE_SECOND_TIME = "EX";
        private static final String LOCK_SUCCESS = "OK";
        private static final Long RELEASE_SUCCESS = 1L;
    
    
        /**
         * 默认锁1分钟,根据业务灵活释放
         **/
        private static final Long LOCK_TIME = 60000L;
    
        private static final String SHOES_CARD_LOCK_KEY= "bsd:shoes:shoes:card:exchange:{userId}";
    
    
        public String lockRetry(Integer userId){
            String key = RedisKey.replaceKey(SHOES_CARD_LOCK_KEY, userId);
            return lockRetry(key);
        }
    
        public void unLock(Integer userId,String requestId){
            String key = RedisKey.replaceKey(SHOES_CARD_LOCK_KEY, userId);
            releaseDistributedLock(key,requestId);
        };
        /**
         *
         * @author wangcheng
         * @date 2020/07/29
         * @param key
         *      默认锁释放时间1分钟
         *      默认重试次数3次
         * @return: java.lang.String
        **/
        public String lockRetry(String key){
            boolean flag ;
            String requestId = getRequestId();
            try {
                for (int i=0;i<3;i++){
                    flag = tryGetDistributeLock(key,requestId,LOCK_TIME);
                    if(flag){
                        return requestId;
                    }
                    Thread.sleep(100);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            return null;
        }
    
        private String getRequestId() {
            return UUID.randomUUID().toString();
        }
    
        /**
         * 设置分布式锁.(过期时间毫秒)
         *
         * @param lockKey    锁的名称.
         * @param requestId  持锁人ID.
         * @param expireTime 锁的超时时间.
         * @return 是否获取锁.
         */
        public synchronized boolean tryGetDistributeLock(String lockKey, String requestId, long expireTime) {
            log.debug("lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId, expireTime);
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_MILL_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }
    
        /**
         * 释放分布式锁.
         *
         * @param lockKey   锁
         * @param requestId 请求标识
         * @return 是否释放成功.
         */
        public synchronized boolean releaseDistributedLock(String lockKey, String requestId) {
            log.debug("release lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId);
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }
    
    }
    
    
    redis常用的api封装
    package com.imooc.service;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.util.CollectionUtils;
    
    import java.lang.reflect.Field;
    import java.util.*;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    import static java.util.stream.Collectors.toList;
    
    /**
     * @author wangcheng
     * @date 2020/7/22
     */
    public class RedisService {
    
    
    
    
        @Autowired
        protected JedisCluster jedis;
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
    
        /**
         * 返回集合成员,其中的集合成员位置按score值递增(从小到大)来排序。.
         *
         * @param key   交集结果的存储的key
         * @param start 元素开始位置
         * @param end   元素结束位置
         * @return 返回指定位置范围的集合成员, 且成员按sore值递增排序.
         */
        public List<String> zrangeWithScores(String key, long start, long end) {
            log.info("key = {}, start = {}, end = {}", key, start, end);
            Set<Tuple> tupleSet = jedis.zrangeWithScores(key, start, end);
            Iterator<Tuple> iterator = tupleSet.iterator();
            List<String> retList = new ArrayList<>();
            while (iterator.hasNext()) {
                Tuple tuple = iterator.next();
                retList.add(tuple.getElement());
            }
            return retList;
        }
    
        /**
         * 返回集合成员,其中的集合成员位置按score值递增(从大到小)来排序。.
         *
         * @param key   交集结果的存储的key
         * @param start 元素开始位置
         * @param end   元素结束位置
         * @return 返回指定位置范围的集合成员, 且成员按sore值递减排序.
         */
        public List<String> zrevrangeWithScores(String key, long start, long end) {
            log.info("key = {}, start = {}, end = {}", key, start, end);
            Set<Tuple> tupleSet = jedis.zrevrangeWithScores(key, start, end);
            Iterator<Tuple> iterator = tupleSet.iterator();
            List<String> retList = new ArrayList<>();
            while (iterator.hasNext()) {
                Tuple tuple = iterator.next();
                retList.add(tuple.getElement());
            }
            return retList;
        }
    
        /**
         * 如果该key对应的值是一个Hash表,则返回对应字段的值。 如果不存在该字段,或者key不存在,则返回null
         *
         * @param key
         * @param field
         * @return
         */
        public String hget(String key, String field) {
            log.debug("key:{},field:{}", key, field);
            String value = this.jedis.hget(key, field);
            log.debug("key:{},field:{},value:{}", key, field, value);
            return value == null || "nil".equals(value) ? null : value;
        }
    
        /**
         * Only set the key if it does not already exist.只有键不存在时,才对键进行设置操作
         */
        private static final String SET_IF_NOT_EXIST = "NX";
        /**
         * Set the specified expire time, in milliseconds.设置时间为毫秒
         */
        private static final String SET_WITH_EXPIRE_MILL_TIME = "PX"; 
        /**
         * Set the specified expire time, in seconds.设置时间为秒
         */
        private static final String SET_WITH_EXPIRE_SECOND_TIME = "EX";
        private static final String LOCK_SUCCESS = "OK";
        private static final Long RELEASE_SUCCESS = 1L;
    
        /**
         * 默认锁3分钟
         **/
        private static final Long LOCK_TIME = 180000L;
    
        /**
         * 设置分布式锁.(过期时间毫秒)
         *
         * @param lockKey    锁的名称.
         * @param requestId  持锁人ID.
         * @param expireTime 锁的超时时间.
         * @return 是否获取锁.
         */
        public boolean tryGetDistributeLock(String lockKey, String requestId, long expireTime) {
            log.debug("lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId, expireTime);
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_MILL_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }
    
        /**
         * 释放分布式锁.
         *
         * @param lockKey   锁
         * @param requestId 请求标识
         * @return 是否释放成功.
         */
        public boolean releaseDistributedLock(String lockKey, String requestId) {
            log.debug("release lock key: {}, ownerId: {}, expire time: {}", lockKey, requestId);
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }
    
        public JedisCluster getJedis() {
            return jedis;
        }
    
        /**
         * 对list进行分页封装.
         * 不支持循环查询.
         *
         * @param key
         * @param pageNo
         * @param pageSize
         * @return
         */
        public List<String> list(String key, Integer pageNo, Integer pageSize) {
            Long length = jedis.llen(key);
            int size = pageNo * pageSize;
            int remainSize = length.intValue() - size;
            if (remainSize >= 0) {
                return jedis.lrange(key, (pageNo - 1) * pageSize, size - 1);
            } else {
                return jedis.lrange(key, (pageNo - 1) * pageSize, length - 1);
            }
        }
    
        /**
         * 支持分页查询redis中的set值.
         * 不支持循环查询.
         *
         * @param key
         * @param pageNo
         * @param pageSize
         * @return
         */
        public Set<String> zSet(String key, Integer pageNo, Integer pageSize) {
            Long length = jedis.zcard(key);
            int size = pageNo * pageSize;
            int remainSize = length.intValue() - size;
            if (remainSize >= 0) {
                return jedis.zrange(key, (pageNo - 1) * pageSize, size - 1);
            } else {
                return jedis.zrange(key, (pageNo - 1) * pageSize, length - 1);
            }
        }
    
        /**
         * 存储list数据到redis中.
         *
         * @param key
         * @param collection
         * @param expireSec
         * @return
         */
        public boolean saveListJson(String key, Collection<E> collection, Integer expireSec) {
            if (!CollectionUtils.isEmpty(collection)) {
                Iterator iterator = collection.iterator();
                while (iterator.hasNext()) {
                    Object obj = iterator.next();
                    try {
                        jedis.lpush(key, objectMapper.writeValueAsString(obj));
                    } catch (JsonProcessingException e) {
                        log.error("{}", e.getMessage());
                    }
                }
                jedis.expire(key, expireSec);
                return true;
            }
            return false;
        }
    
        /**
         * 存储set集合到redis.
         *
         * @param key
         * @param collection
         * @param expireSec
         * @return
         */
        public boolean saveZSetJson(String key, Collection<E> collection, String scoreName, Integer expireSec) {
            if (!CollectionUtils.isEmpty(collection)) {
                Iterator iterator = collection.iterator();
                while (iterator.hasNext()) {
                    Object obj = iterator.next();
                    double score = 0d;
                    try {
                        Field field = obj.getClass().getDeclaredField(scoreName);
                        field.setAccessible(true);
                        Object value = field.get(obj);
                        score = Double.valueOf(value.toString());
                    } catch (NoSuchFieldException | IllegalAccessException e) {
                        log.error("{}", e.getMessage());
                    }
                    try {
                        jedis.zadd(key, score, objectMapper.writeValueAsString(obj));
                    } catch (JsonProcessingException e) {
                        log.error("{}", e.getMessage());
                    }
                }
                jedis.expire(key, expireSec);
                return true;
            }
            return false;
        }
    
    
        /**
         * 对list进行分页封装.
         * 支持循环查询.
         *
         * @param key
         * @param pageNo
         * @param pageSize
         * @return
         */
        public List<String> listLoop(String key, Integer pageNo, Integer pageSize) {
            Long length = jedis.llen(key);
            int size = pageNo * pageSize;
            return null;
        }
    
        /**
         * 获取指定key的jedis.
         *
         * @param key
         */
        private Pipeline getJedisPipeline(String key) {
            JedisClusterPipeline jp = (JedisClusterPipeline) jedis;
            Jedis jedit = jp.getJedit(key);
            return jedit.pipelined();
        }
    
        /**
         * 使用pipeline批量获取数据.
         *
         * @param keys
         * @return
         */
        public List<String> batchGet(String... keys) {
            List<Pipeline> pipelines = Stream.of(keys).map(key -> {
                Pipeline jedisPipeline = getJedisPipeline(key);
                jedisPipeline.get(key);
                return jedisPipeline;
            }).collect(toList());
            return pipelines.stream().flatMap(pipeline -> {
                CompletableFuture<List<Object>> future = CompletableFuture.supplyAsync(() -> pipeline.syncAndReturnAll());
                try {
                    return future.get(1, TimeUnit.SECONDS).stream();
                } catch (Exception e) {
                    log.error("{}", e.getMessage());
                }
                return null;
            }).map(Object::toString).collect(Collectors.toList());
        }
    
        /**
         * 分布式锁唯一标识
         * @return: java.lang.String
         **/
        public static String getRequestId(){
            return UUID.randomUUID().toString();
        }
    
        /**
         * 重试机制
         * @autor wangcheng
         * @param key 锁标识
         * @param requestId 客户端标识
         * @param timeOut 过期时间
         * @param retry 重试次数 默认3次
         * @return
         */
        public String lockRetry(String key){
            Boolean flag = false;
            String requestId = getRequestId();
            try {
                for (int i=0;i<3;i++){
                    flag = tryGetDistributeLock(key,requestId,LOCK_TIME);
                    if(flag){
                        return requestId;
                    }
                    Thread.sleep(100);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            return null;
        }
    
    }
    

    1. 如果A客户端进来由于某些原因,执行时间过长,超过了锁的过期时间,此时B客户端也能获取锁,此时进来也会攻破

    2. 如果高并发1秒百千的请求过来如何优化,因为加锁是串行化的,加入一个业务处理20ms,
      1000个请求1秒只能请求50个,有没有优化的方法?

    相关文章

      网友评论

          本文标题:redis分布式锁api

          本文链接:https://www.haomeiwen.com/subject/acozkktx.html