美文网首页我爱编程
Redis分布式锁的实现

Redis分布式锁的实现

作者: ggr | 来源:发表于2018-04-15 18:37 被阅读0次

    Java 并发包里面以及你给提供了很多方便的工具来帮助我们充分发挥单个节点上面的多线程并发处理能力,但是随着并发量的增大,显然这种单个节点已经无法满足我们的业务量了。
    所以分布式应运而生,通过集群来拓展我们的系统处理能力,这个时候,如何对多个节点,不同的JVM进行并发控制成为了关键性的问题。

    一般会采用分布式锁来做这种同步处理,而这种基于锁的解决方案其实有很多,一般我们用redis或者zookeeper来做。

    下面介绍Redis下面的分布式锁的实现:主要通过redis提供的一些命令来控制。可以基于文件目录,也可以基于某个key。
    HSETNX key field value
    将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field 不存在。
      若域 field 已经存在,该操作无效。
      如果 key 不存在,一个新哈希表被创建并执行 HSETNX 命令。

    HSETNX 带有set if not exist 语义的命令,设置成功,返回 1 。
    如果给定域已经存在且没有操作被执行,返回 0 。
    表示获取了锁,如果失败说明锁被其他节点占用了。所有带有set if not exist 语义的命令都可以。

    下面是实现的例子:
    定义锁的接口:

    import java.util.List;
    import java.util.concurrent.TimeUnit;
    public interface RedisLockService {
        boolean tryLock(String var1);
    
        boolean tryLock(String var1, long var2, TimeUnit var4);
    
        boolean tryLock(String var1, long var2, long var3, TimeUnit var4);
    
        boolean lock(String var1);
    
        void unLock(String var1);
    
        boolean tryLock(List<String> var1);
    
        boolean tryLock(List<String> var1, long var2, TimeUnit var4);
    
        void unLock(List<String> var1);
    
        boolean lockLongTime(String var1);
    }
    

    使用redis实现

    import com.weijuju.iag.common.redis.JedisCallable;
    import com.weijuju.iag.common.redis.RedisExecutor;
    import com.weijuju.iag.sz.demo.service.RedisLockService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.Pipeline;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    @Service
    public class RedisLockEnhanceImpl implements RedisLockService {
    
        @Autowired
        @Qualifier("iagCoreRedisExecutor")
        private RedisExecutor redisExecutor;
    
        //成功获取锁标志
        private static final String LOCK_SUCCES = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";
        private static final String SET_WITH_EXPIRE_TIME = "PX";
    
        @Override
        public boolean tryLock(final String key) {
            return this.tryLock(key, 0L, 300, (TimeUnit) null);
        }
    
        @Override
        public boolean tryLock(final String key, final long timeout, final TimeUnit timeUnit) {
            return this.tryLock(key, timeout, 300, (TimeUnit) null);
        }
    
        @Override
        public boolean tryLock(final String key, final long timeout, final long period, final TimeUnit timeUnit) {
            JedisCallable<Boolean> jedisCallable = new JedisCallable<Boolean>() {
                @Override
                public Boolean call(Jedis jedis) throws Exception {
                    long nano = System.nanoTime();
                    do {
                        /**
                         * 采用setNx+expire的确可以实现,但是注意这里要做好控制setNx+expire方法之间如果存在空隙很可能就存在bug
                         *
                         *  if(LOCK_SUCCES.equals(jedis.setNx(key,key)) && jedis.expire(key,5)==1){...}
                         */
                        if (LOCK_SUCCES.equals(jedis.set(key, key, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, 5))) {
                           log.info("get lock, key: " + key + " , expire in " + 5 + " seconds.");
                            return Boolean.TRUE;
                        }
    
                        String desc = jedis.get(key);
                        log.info("key: " + key + " locked by another business:" + desc);
                        if (timeout == 0) break;
    
                        /**根据具体情况来看,如果获取锁后要做的事情比较多,也就是锁占用锁的时间会比较长,就使用Thread.sleep(..)
                         * 如果时间很短,可以考虑让其cpu自旋,因为线程的状态切换要是要花时间的,这里考虑到开了5000毫秒的时间来占用的话
                         * 可以直接让其sleep一下
                         */
                        Thread.sleep(period);
                    } while (System.nanoTime() - nano < timeUnit.toNanos(timeout));
    
                    return Boolean.FALSE;
                }
            };
            return redisExecutor.doInRedis(jedisCallable).booleanValue();
        }
    
        /**
         * 自旋超过MAX_VALUE考虑直接返回失败
         * 避免僵尸线程
         *
         * @param key
         * @return
         */
        @Override
        public boolean lock(final String key) {
            JedisCallable<Boolean> jedisCallable = new JedisCallable<Boolean>() {
                @Override
                public Boolean call(Jedis jedis) throws Exception {
                    for (int i = 0; i < Integer.MAX_VALUE; i++) {
                        if (LOCK_SUCCES.equals(jedis.set(key, key, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, 5))) {
                            log.info("get lock, key: " + key + " , expire in " + 5 + " seconds.");
                            return Boolean.TRUE;
                        }
                        String desc = jedis.get(key);
                        log.info("key: " + key + " locked by another business:" + desc);
                    }
                    throw new Exception("加锁超时!");
                }
            };
            return redisExecutor.doInRedis(jedisCallable).booleanValue();
        }
    
        @Override
        public void unLock(final String key) {
            List<String> list = new ArrayList<String>();
            list.add(key);
            this.unLock(list);
        }
    
        @Override
        public boolean tryLock(final List<String> keyList) {
            return this.tryLock(keyList, 0L, (TimeUnit) null);
        }
    
        @Override
        public boolean tryLock(final List<String> keyList, final long timeout, final TimeUnit timeUnit) {
            JedisCallable<Boolean> callable = new JedisCallable<Boolean>() {
                public Boolean call(Jedis instance) throws Exception {
                    List<String> needLocking = new CopyOnWriteArrayList();
                    List<String> locked = new CopyOnWriteArrayList();
                    long nano = System.nanoTime();
    
                    do {
                        Pipeline pipeline = instance.pipelined();
                        Iterator i$ = keyList.iterator();
    
                        while (i$.hasNext()) {
                            String identify = (String) i$.next();
                            needLocking.add(identify);
                            pipeline.set(identify, identify, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, 5);
                        }
    
                        log.debug("try lock keys: " + needLocking);
                        List<Object> results = pipeline.syncAndReturnAll();
    
                        for (int i = 0; i < results.size(); ++i) {
                            Long result = (Long) results.get(i);
                            String key = (String) needLocking.get(i);
                            if (result.longValue() == 1L) {
                                locked.add(key);
                            }
                        }
                        //过滤已经加锁成功的
                        needLocking.removeAll(locked);
                        if (CollectionUtils.isEmpty(needLocking)) {
                            return Boolean.valueOf(true);
                        }
    
                        log.debug("keys: " + needLocking + " locked by another business:");
                        if (timeout == 0L) {
                            break;
                        }
                        Thread.sleep(500L);
                    } while (System.nanoTime() - nano < timeUnit.toNanos(timeout));
    
                    /**
                     * 超时回滚
                     */
                    if (!CollectionUtils.isEmpty(locked)) {
                        instance.del((String[]) locked.toArray(new String[0]));
                    }
                    return Boolean.valueOf(false);
                }
            };
            return ((Boolean) this.redisExecutor.doInRedis(callable)).booleanValue();
        }
    
    
        @Override
        public void unLock(final List<String> keyList) {
            JedisCallable<Boolean> callable = new JedisCallable<Boolean>() {
                public Boolean call(Jedis jedis) throws Exception {
                    List<String> keys = new CopyOnWriteArrayList();
                    Iterator i$ = keyList.iterator();
    
                    while (i$.hasNext()) {
                        String key = (String) i$.next();
                        keys.add(key);
                    }
                    try {
                        /**
                         * 当且仅当keys里面的数据全部被del掉才能算解锁完全
                         */
                        if ((keys.size() == jedis.del((String[]) keys.toArray(new String[0])))) {
                            log.debug("release lock, keys :" + keys);
                            return Boolean.valueOf(true);
                        }
    
                    } catch (Exception var5) {
                        log.error(var5.getMessage(), var5);
                    }
                    return Boolean.valueOf(false);
                }
            };
            this.redisExecutor.doInRedis(callable);
        }
    
        @Override
        public boolean lockLongTime(String key) {
            return this.tryLock(key, 30, 100, (TimeUnit) null);
        }
    }
    

    相关文章

      网友评论

        本文标题:Redis分布式锁的实现

        本文链接:https://www.haomeiwen.com/subject/cqudkftx.html