SpringBoot集成Curator实现分布式锁

作者: 移动的红烧肉 | 来源:发表于2018-05-22 20:26 被阅读576次

    1.Curator 介绍

    • 摘录官网的介绍
      Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.

    • 官网地址

    2.依赖

    <!-- zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-framework</artifactId>
         <version>2.12.0</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
         <version>2.12.0</version>
    </dependency>
    

    3.配置文件

    • application.properties 中添加如下配置
    curator.retryCount=5  #重试次数
    curator.elapsedTimeMs=5000   #重试间隔时间
    curator.connectString=127.0.0.1:2181   # zookeeper 地址
    curator.sessionTimeoutMs=60000  # session超时时间
    curator.connectionTimeoutMs=5000  # 连接超时时间
    

    4.配置类编写

    • 1)配置类
    package com.zxr.micro.config.zookeeper;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Data
    @Component
    @ConfigurationProperties(prefix = "curator")
    public class WrapperZk {
    
        private int retryCount;
    
        private int elapsedTimeMs;
    
        private String connectString;
    
        private int sessionTimeoutMs;
    
        private int connectionTimeoutMs;
    }  
    
    • 2)配置中心
    package com.zxr.micro.config.zookeeper;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ZkConfiguration {
    
        @Autowired
        WrapperZk wrapperZk;
    
        @Bean(initMethod = "start")
        public CuratorFramework curatorFramework() {
            return CuratorFrameworkFactory.newClient(
                    wrapperZk.getConnectString(),
                    wrapperZk.getSessionTimeoutMs(),
                    wrapperZk.getConnectionTimeoutMs(),
                    new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
        }
    }
    
    

    5.分布式锁实现

    package com.zxr.micro.config.zookeeper;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.concurrent.CountDownLatch;
    
    @Slf4j
    @Service
    public class DistributedLockByZookeeper implements InitializingBean{
    
        private final static String ROOT_PATH_LOCK = "rootlock";
        private CountDownLatch countDownLatch = new CountDownLatch(1);
    
        @Autowired
        private CuratorFramework curatorFramework;
    
        /**
         * 获取分布式锁
         */
        public void acquireDistributedLock(String path) {
            String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            while (true) {
                try {
                    curatorFramework
                            .create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(keyPath);
                    log.info("success to acquire lock for path:{}", keyPath);
                    break;
                } catch (Exception e) {
                    log.info("failed to acquire lock for path:{}", keyPath);
                    log.info("while try again .......");
                    try {
                        if (countDownLatch.getCount() <= 0) {
                            countDownLatch = new CountDownLatch(1);
                        }
                        countDownLatch.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 释放分布式锁
         */
        public boolean releaseDistributedLock(String path) {
            try {
                String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
                if (curatorFramework.checkExists().forPath(keyPath) != null) {
                    curatorFramework.delete().forPath(keyPath);
                }
            } catch (Exception e) {
                log.error("failed to release lock");
                return false;
            }
            return true;
        }
    
        /**
         * 创建 watcher 事件
         */
        private void addWatcher(String path) throws Exception {
            String keyPath;
            if (path.equals(ROOT_PATH_LOCK)) {
                keyPath = "/" + path;
            } else {
                keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            }
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener((client, event) -> {
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    log.info("上一个节点 "+ oldPath + " 已经被断开");
                    if (oldPath.contains(path)) {
                        //释放计数器,让当前的请求获取锁
                        countDownLatch.countDown();
                    }
                }
            });
        }
    
         //创建父节点,并创建永久节点
        @Override
        public void afterPropertiesSet() {
            curatorFramework = curatorFramework.usingNamespace("lock-namespace");
            String path = "/" + ROOT_PATH_LOCK;
            try {
                if (curatorFramework.checkExists().forPath(path) == null) {
                    curatorFramework.create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.PERSISTENT)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(path);
                }
                addWatcher(ROOT_PATH_LOCK);
                log.info("root path 的 watcher 事件创建成功");
            } catch (Exception e) {
                log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
            }
        }
    }
    
    

    6.测试

    • 弄两个 GET 请求测一下,看看效果
    package com.zxr.micro.controller;
    
    import com.zxr.micro.common.RespMsg;
    import com.zxr.micro.config.zookeeper.DistributedLockByZookeeper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/hello")
    public class MicroController {
    
        @Autowired
        private DistributedLockByZookeeper distributedLockByZookeeper;
    
        private final static String PATH = "test";
    
        @GetMapping("/lock1")
        public RespMsg<Boolean> getLock1() {
            Boolean flag;
            distributedLockByZookeeper.acquireDistributedLock(PATH);
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            }
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            return RespMsg.success(flag);
        }
    
        @GetMapping("/lock2")
        public RespMsg getLock2() {
            Boolean flag;
            distributedLockByZookeeper.acquireDistributedLock(PATH);
            try {
                Thread.sleep(15000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            }
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            return RespMsg.success(flag);
        }
    }
    
    

    7.注意

    • 这个 DistributedLockByZookeeper 中的 afterPropertiesSet 可以使用@Bean的initMethod 初始化创建

    相关文章

      网友评论

        本文标题:SpringBoot集成Curator实现分布式锁

        本文链接:https://www.haomeiwen.com/subject/ltmkjftx.html