美文网首页
分布式锁

分布式锁

作者: 米刀灵 | 来源:发表于2017-09-20 15:17 被阅读75次

    有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。单机的可以使用ReentrantLock或者synchronized代码块来实现,但是这些API在分布式场景中就无能为力了。
    针对分布式锁的实现,目前比较常用的有以下几种方案:基于数据库, 基于缓存(redis,memcached),基于Zookeeper。

    应用的场景例子:
    管理后台的部署架构(多台tomcat服务器+redis【多台tomcat服务器访问一台redis】+mysql【多台tomcat服务器访问一台服务器上的mysql】)就满足使用分布式锁的条件。多台服务器要访问redis全局缓存的资源,如果不使用分布式锁就会出现问题。 看如下伪代码:

        long N=0L;
        //N从redis获取值
        if(N<5){
        N++;
        //N写回redis
        }
    

    从redis获取值N,对数值N进行边界检查,自加1,然后N写回redis中。 这种应用场景很常见,像秒杀,全局递增ID、IP访问限制等。

    基于redis:

    使用 SETNX key value 命令。
    设置成功,返回1,加锁。该客户端最后可以通过DEL key来释放该锁。
    设置失败,返回0,获取锁失败。这时我们可以先返回或进行重试等对方完成或等待锁超时。

    存在的问题:

    • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直存在,其他线程无法再获得到锁。
    • 这把锁只能是非阻塞的,无论成功还是失败都直接返回。
    • 这把锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在tair中已经存在。无法再执行put操作。

    解决方式:

    • 用put方法支持传入失效时间,到达时间之后数据会自动删除。
    • while重复执行。
    • 在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者。

    仍然存在的问题:

    • 为什么不直接使用expire设置超时时间,而将时间的毫秒数其作为value放在redis中。因为假如在setnx后,redis崩溃了,expire就没有执行(set和expire设置失效时间只能分两步执行),结果就是死锁了。锁永远不会超时。
    • 对于失效时间,如果如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。这个问题使用数据库实现分布式锁同样存在。

    具体实现:

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.dao.DataAccessException;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    public class RedisLock {
    
        private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
        private RedisTemplate redisTemplate;
        private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
    
        private String lockKey;
    
        //锁超时时间,防止线程在入锁以后,无限的执行等待    
        private int expireMsecs = 60 * 1000;
        //获取锁失败后,锁等待时间。防止无限期等待
        private int timeoutMsecs = 10 * 1000;
        //是否加锁标志位
        private volatile boolean locked = false;
    
        //重载构造方法
        public RedisLock(RedisTemplate redisTemplate, String lockKey) {
            this.redisTemplate = redisTemplate;
            this.lockKey = lockKey + "_lock";
        }
        public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
            this(redisTemplate, lockKey);
            this.timeoutMsecs = timeoutMsecs;
        }
        public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
            this(redisTemplate, lockKey, timeoutMsecs);
            this.expireMsecs = expireMsecs;
        }
    
    
        public String getLockKey() {
            return lockKey;
        }
    
        //redis在存储数据时,都把数据转化成了byte[]数组的形式,那么在存取数据时,需要用到Serializer将数据格式进行转化。
        //对redis进行get操作
        private String get(final String key) {
            Object obj = null;
            try {
                obj = redisTemplate.execute(new RedisCallback<Object>() {
                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisSerializer serializer = new StringRedisSerializer();
                        byte[] data = connection.get(serializer.serialize(key));
                        connection.close();
                        if (data == null) {
                            return null;
                        }
                        return serializer.deserialize(data);
                    }
                });
            } catch (Exception e) {
                logger.error("get redis error, key : {}", key);
            }
            return obj != null ? obj.toString() : null;
        }
    
        //对redis进行setNX操作
        private boolean setNX(final String key, final String value) {
            Object obj = null;
            try {
                obj = redisTemplate.execute(new RedisCallback<Object>() {
                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisSerializer serializer = new StringRedisSerializer();
                        Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
                        connection.close();
                        return success;
                    }
                });
            } catch (Exception e) {
                logger.error("setNX redis error, key : {}", key);
            }
            return obj != null ? (Boolean) obj : false;
        }
    
        //对redis进行getset操作
        private String getSet(final String key, final String value) {
            Object obj = null;
            try {
                obj = redisTemplate.execute(new RedisCallback<Object>() {
                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisSerializer serializer = new StringRedisSerializer();
                        byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
                        connection.close();
                        return serializer.deserialize(ret);
                    }
                });
            } catch (Exception e) {
                logger.error("setNX redis error, key : {}", key);
            }
            return obj != null ? (String) obj : null;
        }
    
        /**
         * 使用setnx命令,缓存了锁。, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
         * 执行过程:
         * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁。
         * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值,成功获得锁。
         */
        public synchronized boolean lock() throws InterruptedException {
            int timeout = timeoutMsecs;
            while (timeout >= 0) {
                long expires = System.currentTimeMillis() + expireMsecs + 1;
                String expiresStr = String.valueOf(expires); //锁到期时间
                
                //加锁。如果不存在加锁成功,返回true。
                if (this.setNX(lockKey, expiresStr)) {
                    //得到锁
                    locked = true;
                    return true;
                }
    
                //lockKey已存在,获取lockKey里的时间。
                String currentValueStr = this.get(lockKey);
                //判断是否为空,是否过期。
                if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                    //过期,设置现在的锁到期时间。只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的。
                    String oldValueStr = this.getSet(lockKey, expiresStr);
    
                    if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                        //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
                        //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
    
                        //得到锁
                        locked = true;
                        return true;
                    }
                }
                timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
                Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
            }
            //已存在lockKey,且未过期,则获取锁失败
            return false;
        }
    
    
        //解锁。直接删除lockKey即可
        public synchronized void unlock() {
            if (locked) {
                redisTemplate.delete(lockKey);
                locked = false;
            }
        }
    
        public static void main(String[] args) throws Exception {
            RedisLock lock = new RedisLock(redisTemplate, key, 10000, 20000);
            try{
                if(lock.lock()) {
                    //需要加锁的代码
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而超时,此时锁已经被别人获得,这时就不必解锁了。
                lock.unlock();
            }
        }
    
    }
    

    基于zookeeper:

    • 加锁:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的节点下生成一个唯一的临时(挥发性)有序(自增长)节点。
    • 获取锁:调用getChildren(“locker”)来获取locker下面的所有子节点。客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小(自增长,先到先得),那么就认为该客户端获取到了锁。
    • 阻塞:如果创建的节点并非locker所有子节点中最小的,则没有获取到锁,此时找到比自己小的那个节点,然后对其调用exist()方法,并注册监听器。如果这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如果是则获取到了锁,否则重复以上步骤。
    • 释放锁:只需将这个临时节点删除即可。

    左边的整个区域表示一个Zookeeper集群。locker是Zookeeper的一个持久节点。node_x是locker这个持久节点下面的临时顺序节点。client_x表示多个客户端。Service表示需要互斥访问的共享资源。

    Zookeeper如何解决前面提到的问题:

    • 锁无法释放?客户端可以在Zookeeper中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
    • 非阻塞锁? 客户端可以通过在ZK中创建顺序(自增长)节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
    • 不可重入? 客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中。下次想要获取锁的时候和当前最小的节点中的数据比对,如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。

    仍然存在的问题:

    • 性能上没有缓存高。每次都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
    • 如果由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁。可以设置重试策略。多次重试之后还不行的话才会删除临时节点。

    curator实现zookeeper的分布式锁:

        package bjsxt.curator.lock;
        import java.text.SimpleDateFormat;
        import java.util.Date;
        import java.util.concurrent.CountDownLatch;
        import org.apache.curator.RetryPolicy;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.curator.framework.CuratorFrameworkFactory;
        import org.apache.curator.framework.recipes.locks.InterProcessMutex;
        import org.apache.curator.retry.ExponentialBackoffRetry;
    
    
        public class Lock {
    
            /** zookeeper地址 */
            static final String CONNECT_ADDR = "192.168.0.4:2181,192.168.0.9:2181,192.168.0.6:2181";
            /** session超时时间 */
            static final int SESSION_OUTTIME = 5000;
            
            static int count = 10;
            public static void genarNo(){
                try {
                    count--;
                    System.out.println(Thread.currentThread().getName()+" : "+count);
                } finally {
                
                }
            }
            
            public static void main(String[] args) throws Exception {
                
                //1 重试策略:初试时间为1s 重试10次
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                //2 通过工厂创建连接
                CuratorFramework cf = CuratorFrameworkFactory.builder()
                                                             .connectString(CONNECT_ADDR)
                                                             .sessionTimeoutMs(SESSION_OUTTIME)
                                                             .retryPolicy(retryPolicy)
                                                             //.namespace("super")
                                                             .build();
                //3 开启连接
                cf.start();
                
                //4 分布式锁
                final InterProcessMutex lock = new InterProcessMutex(cf, "/lockpath");
                final CountDownLatch countdown = new CountDownLatch(1);
                
                for(int i = 0; i < 10; i++){
    
                    //循环开始10个线程,模仿多个客户端同时操作
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                countdown.await();
                                //加锁
                                lock.acquire();
                                //-------------业务处理开始
                                genarNo();
                                //-------------业务处理结束
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                try {
                                    //释放
                                    lock.release();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }, "t"+i ).start();
                }
                Thread.sleep(100);
                countdown.countDown();
            }
        }
    

    运行结果为依次打印出9876543210。
    不加锁的话,一种可能的结果为:7564271703。


    参考:
    http://www.jianshu.com/p/c77a5257303a
    http://blog.csdn.net/pengshuai128/article/details/70593995
    http://www.cnblogs.com/520playboy/p/6441651.html

    相关文章

      网友评论

          本文标题:分布式锁

          本文链接:https://www.haomeiwen.com/subject/udbwsxtx.html