美文网首页
利用redis实现简单的分布式锁

利用redis实现简单的分布式锁

作者: millions_chan | 来源:发表于2017-01-06 19:57 被阅读669次

    导言


    后端程序员多多少少都与各种各样的“锁”打过交道,如: 电商网站在下单时对某个商品减库存,必须加锁防止多个客户同时下单导致库存量错误; 抢分享红包时必须对红包加锁防止用户抢到的金额超过红包上限等。

    对于单体或部署在单台服务器上的应用而言,对某种资源加锁相对简单,使用语言本身的并发控制机制或是文件锁即可实现。然而为了提高响应速度以及防止单点,当前的应用常常会在多台服务器上部署多个实例,这时就需要进行分布式的并发控制了。

    一种实现分布式锁的思路利用了redis的setnx命令,除了这种思路以外,Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。由于redis是单线程的,单次执行的lua脚本可以认为是原子的,redis的java客户端redisson就利用了这一特性实现了分布式锁。

    redisson的加锁原理


    redisson加锁的逻辑在这里,如果释放锁的时间不是-1,则直接试着加锁,否则认为该锁的超时时间为无穷大,在成功获得锁之后每隔一段时间刷新锁的过期时间。

       private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, 
        TimeUnit unit,
        final long threadId) {
            if (leaseTime != -1) {
                return tryLockInnerAsync(leaseTime,
                  unit,
                  threadId, 
                  RedisCommands.EVAL_NULL_BOOLEAN);
            }
            RFuture<Boolean> ttlRemainingFuture 
                      = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
                                 TimeUnit.SECONDS, 
                                 threadId, 
                                 RedisCommands.EVAL_NULL_BOOLEAN);
    
            ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (!future.isSuccess()) {
                        return;
                    }
    
                    Boolean ttlRemaining = future.getNow();
                    // lock acquired
                    if (ttlRemaining) {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    

    加锁的过程主要是靠下面这段lua脚本:

        <T> RFuture<T> tryLockInnerAsync(long leaseTime, 
               TimeUnit unit,
               long threadId, 
               RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(),
                      LongCodec.INSTANCE, 
                      command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                      Collections.<Object>singletonList(getName()), 
                      internalLockLeaseTime,
                      getLockName(threadId));
        }
    

    上面的KEYS[1]为加锁时指定的锁的名称,argv[1]是internalLockLeaseTime,argv[2]是thread_id。这段逻辑为:如果锁不存在,则hset 锁名称 线程id 1,直接返回。 如果hexists 锁名称 线程id返回1,则说明该客户端已经加过锁了,则将自己线程id对应的值增加1,并重新设置该key的超时时间,实现可重入。否则说明该锁已经被其他客户端获取,返回锁还有多久过期。如果加锁失败,客户端可以选择利用redis的pub/sub机制监听一个channel,在监听到锁被释放后尝试重新加锁,或者返会失败让客户端决定下一步操作。

    解锁时KEY[1]为锁名称,KEY[2]为关注的队列名称,ARGV[1]为解锁消息,ARGV[2]为超时时间,ARGV[3]为线程:

       protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(),
                      LongCodec.INSTANCE,
                      RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), 
                               getChannelName()),
                               LockPubSub.unlockMessage, 
                               internalLockLeaseTime, getLockName(threadId));
        }
    

    解锁的时候如果锁已经超时,则通知其他等待获取该锁的客户端并返回1。如果该锁没有被当前客户端获得,则什么都不做直接返回null。如果当期客户端拥有锁,则为了可重入,先减少一次当前线程获取锁的次数,如果unlock次数小于lock次数,则重新设置锁的超时时间,返回0,如果锁已经被彻底释放,则删除掉锁,并通知其他等待获取锁的客户端,并返回1表示解锁成功。

    总体而言,redisson的锁防止了由于客户端死掉而导致的锁永久被占用,还支持可重入锁,功能上还是比较完备的。

    封装redisson


    redisson的分布式锁从功能上说已经够用了,为了方便使用以及与spring集成,我们需要再将其做一次封装,由于redis有集群、单机、sentinel几种模式,我们将加锁功能抽象起来,提供统一的接口。话不多说,我们之间上代码:

    /** 
      * Created by millions on 2017/1/6.
     * 分布式锁 
     */
    public interface DistributedLock {    
    /**    
      * 使用分布式锁,获取锁后执行callback指定的业务逻辑,使用锁默认超时时间。     
      * 
      * @param callback 完成回调     
      * @return  
      */ 
       <T> T lock(DistributedLockCallback<T> callback);  
    
      /**
        * 使用分布式锁,获取锁后执行callback指定的业务逻辑,使用锁默认超时时间。     
        *
        * @param callback  
        * @param leaseTime 锁超时时间。超时后自动释放锁。   
        * @param timeUnit 超时时间单位    
        * @return
        */
        <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit);
    }
    

    我们将加锁后需要的逻辑封装到callback中:

    /** * Created by millions on 2017/1/6. * 分布式锁回调接口 */
    public interface DistributedLockCallback<T> { 
       T process();   
       String getLockName();
    }
    

    实现单机redis的分布式锁:

    public class SingleDistributedLock implements DistributedLock {
        private static final long DEFAULT_TIMEOUT = 5;
        private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
        private RedissonClient redissonClient;
    
        public SingleDistributedLock(RedissonClient redissonClient) {
            this.redissonClient = redissonClient;
        }
    
        public SingleDistributedLock() {
        }
    
        public <T> T lock(DistributedLockCallback<T> callback) {
            return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT);
        }
    
        public <T> T lock(DistributedLockCallback<T> callback, long leaseTime,
            TimeUnit timeUnit) {
            RLock lock = null;
    
            try {
                lock = redissonClient.getLock(callback.getLockName());
                lock.lock(leaseTime, timeUnit);
    
                return callback.process();
            } finally {
                if (null != lock) {
                    lock.unlock();
                }
            }
        }
    
        public void setRedissonClient(RedissonClient redissonClient) {
            this.redissonClient = redissonClient;
        }
    }
    

    为了方便的和spring整合,我们实现spring的FactoryBean接口

    public class DistributedLockFactoryBean implements FactoryBean<DistributedLock> {
        private static final Logger logger 
         = LoggerFactory.getLogger(DistributedLockFactoryBean.class);
    
        private RedissonClient client;
    
        private LockModel lockModel;
    
        private DistributedLock lock;
    
        public DistributedLockFactoryBean(String modelName) {
            lockModel = LockModel.fromString(modelName);
        }
    
        @PostConstruct
        public void init() {
            logger.debug("DistributedLockFactoryBean is initializing!");
            //从classpath下读取RedissionClient配置
            InputStream inputStream = null;
            Config config = null;
            try {
                inputStream = DistributedLockFactoryBean
                .class
                .getClassLoader()
                .getResourceAsStream("redis-config.json");
                config = Config.fromJSON(inputStream);
            } catch (IOException e) {
                logger.error("读取redission配置失败");
            } finally {
                if (null != inputStream) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        logger.info("", e);
                    }
                }
            }
            Assert.notNull(config);
            //初始化client
            client = Redisson.create(config);
        }
    
        @PreDestroy
        public void destroy() {
            logger.debug("DistributedLockFactoryBean will be destroyed!");
            client.shutdown();
        }
    
        public DistributedLock getObject() throws Exception {
            Class<? extends DistributedLock> clz = lockModel.lockClass();
            lock = clz.newInstance();
            Method setRedissonClient
                = clz.getMethod("setRedissonClient", RedissonClient.class);
            if (null != setRedissonClient) {
                setRedissonClient.invoke(lock, client);
                return lock;
            }
            throw new InvalidObjectException(clz.getSimpleName()+"不支持的RedissonClient");
        }
    
        public Class<?> getObjectType() {
            return DistributedLock.class;
        }
    
        public boolean isSingleton() {
            return true;
        }
    
        private enum LockModel {
            SINGLE {
                Class<? extends DistributedLock> lockClass() {
                    return SingleDistributedLock.class;
                }
            };
    
            abstract Class<? extends DistributedLock> lockClass();
    
            public static LockModel fromString(String modelName) {
                LockModel[] values = LockModel.values();
                for (LockModel model : values) {
                    if (model.name().equalsIgnoreCase(modelName)) {
                        return model;
                    }
                }
                throw new InvalidParameterException("找不到合适的LockModel");
            }
        }
    }
    

    使用时引入jar包,在resource目录下放入redis配置文件,配置好factoryBean 就可以愉快的使用啦~

    {
      "singleServerConfig": {
        "idleConnectionTimeout": 10000,
        "pingTimeout": 1000,
        "connectTimeout": 10000,
        "timeout": 3000,
        "retryAttempts": 3,
        "retryInterval": 1500,
        "reconnectionTimeout": 3000,
        "failedAttempts": 3,
        "password": null,
        "subscriptionsPerConnection": 5,
        "clientName": null,
        "address": "redis://192.168.0.133:6379",
        "subscriptionConnectionMinimumIdleSize": 1,
        "subscriptionConnectionPoolSize": 50,
        "connectionMinimumIdleSize": 10,
        "connectionPoolSize": 64,
        "database": 0,
        "dnsMonitoring": false,
        "dnsMonitoringInterval": 5000
      },
      "threads": 0,
      "nettyThreads": 0,
      "codec": null,
      "useLinuxNativeEpoll": false //redisson使用netty 需要指定是否使用epoll模式
    }
    

    总结


    总结一下,redisson利用redis工作线程为单线程,lua脚本能够原子执行的这一特性,利用lua脚本和redis的pub/sub特性实现了分布式、可重入的锁。在redisson的基础上进行了封装,并添加了与spring的整合。这个分布式锁没用什么高大上的原理,代码也写的很屌丝,但是足以支撑一般规模的业务。从这里我想到,很多时候业务上提的一些需求,我们其实都应该多思考一下,将其抽象,整合成一般的工具库,加快自己以及同事的开发效率。从另外一个角度来说,需求方可能也要给码农们稍微充裕一点的时间,多想,多思考,这样才能更好的推进公司的发展。

    相关文章

      网友评论

          本文标题:利用redis实现简单的分布式锁

          本文链接:https://www.haomeiwen.com/subject/kzcxbttx.html