美文网首页
分布式锁zk|redis实现demo

分布式锁zk|redis实现demo

作者: 笔_尖 | 来源:发表于2019-08-29 11:09 被阅读0次

    [toc]

    redis分布式锁

    /**
     * @author yujianjian  2019-08-28 17:28
     * redis分布式锁实现
     */
    @Slf4j
    @Service
    public class RedisLock {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
        @Autowired
        private DefaultRedisScript<Long> redisScript;
    
        private static final Long SUCCESS = 1L;
    
    
        /**
         * 尝试获取锁,只获取一次
         *
         * @param lockKey    key
         * @param value      value
         * @param expireTime 过期时间-秒
         * @return 获取锁的结果
         */
        public boolean tryLock(String lockKey, String value, int expireTime) {
            try {
                return redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);
                // String script = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";
            } catch (Exception e) {
                log.error("RedisLock|tryLock fail lockKey={}|value={}|expireTime={}", lockKey, value, expireTime, e);
            }
            return false;
        }
    
        /**
         * 加锁(指定最大尝试次数)
         *
         * @param lockKey     key
         * @param value       value
         * @param expireTime  过期时间-秒
         * @param tryTimes    最大尝试次数
         * @param sleepMillis 每两次尝试之间的休眠时间(毫秒)
         * @return 获取锁的结果
         */
        public boolean getLock(String lockKey, String value, int expireTime, int tryTimes, long sleepMillis) {
            boolean result = false;
            int count = 0;
            while (!result && count <= tryTimes) {
                count++;
                result = tryLock(lockKey, value, expireTime);
                try {
                    TimeUnit.MILLISECONDS.sleep(sleepMillis);
                } catch (InterruptedException e) {
                    log.error("getLock fail", e);
                }
            }
            return result;
        }
    
    
        /**
         * 释放锁
         *
         * @param lockKey key
         * @param value   value
         * @return 释放锁的结果
         */
        public boolean releaseLock(String lockKey, String value) {
            try {
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    
                Long result = redisTemplate.execute(redisScript, Arrays.asList(lockKey, value));
    
                return SUCCESS.equals(result);
            } catch (Exception e) {
                log.error("RedisLock|releaseLock fail lockKey={}|value={}", lockKey, value, e);
    
            }
    
            return false;
    
        }
    
        @Bean
        public DefaultRedisScript<Long> defaultRedisScript() {
            DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
            defaultRedisScript.setResultType(Long.class);
            defaultRedisScript.setScriptText("if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end");
            return defaultRedisScript;
        }
    
    
    }
    

    zk分布式锁

    zk分布式锁实现原理图

    zk-dispatcher-lock.png

    pom中添加依赖

     <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    

    properties添加配置信息

    curator:
      # 重试次数
      retryCount: 5
      # 重试间隔时间
      elapsedTimeMs: 5000
      # zk地址
      connectString: 127.0.0.1:2181
      # session超时时间
      sessionTimeoutMs: 60000
      # 连接超时时间
      connectionTimeoutMs: 5000
    

    代码中增加连接信息

    
    @Configuration
    public class CuratorConfig {
    
        @Value("${curator.retryCount}")
        private int retryCount;
    
        @Value("${curator.elapsedTimeMs}")
        private int elapsedTimeMs;
    
        @Value("${curator.connectString}")
        private String connectString;
    
        @Value("${curator.sessionTimeoutMs}")
        private int sessionTimeoutMs;
    
        @Value("${curator.connectionTimeoutMs}")
        private int connectionTimeoutMs;
    
        @Bean(initMethod = "start")
        public CuratorFramework curatorFramework() {
            return CuratorFrameworkFactory.newClient(
                    connectString,
                    sessionTimeoutMs,
                    connectionTimeoutMs,
                    new RetryNTimes(retryCount, elapsedTimeMs));
        }
    }
    

    锁的代码实现

    /**
     * @author yujianjian  2019-08-29 09:44
     */
    @Service
    @Slf4j
    public class DistributedLockByCurator implements InitializingBean {
    
    
        // zk下的创建的目录
        private final static String ROOT_PATH_LOCK = "rootlock";
    
        private CountDownLatch cdl = new CountDownLatch(1);
    
        @Autowired
        private CuratorFramework curatorFramework;
    
    
        /**
         * 获取分布式锁
         */
        public void acquireLock(String path) {
            String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            while (true) {
                try {
                    curatorFramework
                            .create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(keyPath);
                    log.info("acquireLock|success to acquire lock for path:{}", keyPath);
                    break;
                } catch (Exception e) {
                    log.info("acquireLock|failed to acquire lock for path:{}", keyPath);
                    log.info("acquireLock|while try again ...");
                    try {
                        if (cdl.getCount() <= 0) {
                            cdl = new CountDownLatch(1);
                        }
                        cdl.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 释放分布式锁
         */
        public boolean releaseLock(String path) {
            try {
                String keyPath =  "/" + ROOT_PATH_LOCK + "/" + path;
                if (curatorFramework.checkExists().forPath(keyPath) != null) {
                    curatorFramework.delete().forPath(keyPath);
                }
            } catch (Exception e) {
                log.error("releaseLock|failed to release lock|path:{}", path, e);
                return false;
            }
            return true;
        }
    
    
        /**
         * 创建 watcher 事件
         */
        private void addWatch(String path) throws Exception {
            String keyPath;
            if (path.equals(ROOT_PATH_LOCK)) {
                keyPath = "/" + path;
            } else {
                keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            }
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener((client, event) -> {
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    log.info("addWatch|success to release lock for path:{}", oldPath);
                    if (oldPath.contains(path)) {
                        //释放计数器,让当前的请求获取锁
                        cdl.countDown();
                    }
                }
            });
        }
    
    
        //创建父节点,并创建永久节点
        @Override
        public void afterPropertiesSet() throws Exception {
            curatorFramework = curatorFramework.usingNamespace("lock-namespace");
            String path = "/" + ROOT_PATH_LOCK;
            try {
                if (curatorFramework.checkExists().forPath(path) == null) {
                    curatorFramework.create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.PERSISTENT)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(path);
                }
                addWatch(ROOT_PATH_LOCK);
                log.info("root path 的 watcher 事件创建成功");
            } catch (Exception e) {
                log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
            }
        }
    }
    

    测试代码

       // docker exec -it zk /bin/bash 进入zk容器
       // 在bin下执行 zkCli.sh 进入zk终端
       // 执行 ls /name-space/rootlock,将看到下面有没有加锁的目录,能看到锁释放的时候下面的目录会被删除
       
        @Autowired
        private DistributedLockByCurator distributedLockByCurator;
       
        @Test
        public void getZkLock() throws Exception{
            distributedLockByCurator.acquireLock("test1");
            Thread.sleep(60000);
            boolean result = distributedLockByCurator.releaseLock("test1");
            System.out.println("释放锁1的结果 = " + result);
    
            distributedLockByCurator.acquireLock("lock2");
            Thread.sleep(60000);
            boolean result2 = distributedLockByCurator.releaseLock("lock2");
            System.out.println("释放锁2的结果 = " + result2);
    
        }
       
       
    
    

    相关文章

      网友评论

          本文标题:分布式锁zk|redis实现demo

          本文链接:https://www.haomeiwen.com/subject/yqemectx.html