美文网首页
Curator框架实现ZooKeeper分布式锁

Curator框架实现ZooKeeper分布式锁

作者: 爪哇驿站 | 来源:发表于2021-01-13 11:16 被阅读0次
    一、引入相关jar包
    <!--引入zk依赖-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.1</version>
        <!--排除这个slf4j-log4j12-->
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!--引入curator依赖-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-client</artifactId>
        <version>4.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.2.0</version>
    </dependency>
    
    二、ZkLock编写
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
    import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
    import org.apache.curator.retry.RetryNTimes;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class ZkLock {
        static CuratorFramework zkClient = null;
    
        static {
            zkClient = CuratorFrameworkFactory.newClient(
                    "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                    20000,
                    20000,
                    new RetryNTimes(3, 5000));
            zkClient.start();
        }
    
        /**
         * 获取互斥锁
         *
         * @param name
         * @return
         * @throws Exception
         */
        public InterProcessMutex getLock(String name) throws Exception {
            return new InterProcessMutex(zkClient, buildPath(name));
        }
    
        /**
         * 获取可重入互斥锁
         *
         * @param name
         * @return
         * @throws Exception
         */
        public InterProcessMutex getMutexLock(String name) throws Exception {
            return new InterProcessMutex(zkClient, buildPath(name));
        }
    
        /**
         * 获取不可重入互斥锁
         *
         * @param name
         * @return
         * @throws Exception
         */
        public InterProcessSemaphoreMutex getSemaphoreLock(String name) throws Exception {
            return new InterProcessSemaphoreMutex(zkClient, buildPath(name));
        }
    
        /**
         * 获取读写锁
         *
         * @param name
         * @return
         * @throws Exception
         */
        public InterProcessReadWriteLock getReadWriteLock(String name) throws Exception {
            return new InterProcessReadWriteLock(zkClient, buildPath(name));
        }
    
    
        /**
         * 获取多锁(集合锁)
         *
         * @param names
         * @return
         * @throws Exception
         */
        public InterProcessMultiLock getMutilLock(List<String> names) throws Exception {
            return new InterProcessMultiLock(zkClient, mutilPath(names));
        }
    
        /**
         * 创建多个lock节点
         */
        public List<String> mutilPath(List<String> names) {
            List<String> paths = new ArrayList<>();
            for (String name : names) {
                paths.add(buildPath(name));
            }
            return paths;
        }
    
        /**
         * 创建lock节点
         *
         * @param name
         * @return
         */
        public String buildPath(String name) {
            String path = "";
            String[] roots = new String[]{"mg", "mylock"};
            for (String str : roots) {
                if (str.startsWith("/")) {
                    path += "/";
                }
                path += "/" + str;
            }
            path += "/" + name;
            return path;
        }
    }
    
    三、使用示例
    import com.demo.zk.utils.ZkLock;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
    import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
    
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class ZkLockTest {
        public void lock1(InterProcessMutex lock) throws Exception {
            lock.acquire();
            log.info("lock1成功获取锁");
            lock2(lock);
            lock.release();
            log.info("lock1成功释放锁");
        }
    
        public void lock2(InterProcessMutex lock) throws Exception {
            lock.acquire();
            log.info("lock2成功获取锁");
            Thread.sleep(1000*10);
    
            lock.release();
            log.info("lock2成功释放锁");
        }
    
    
        public void lock3(InterProcessSemaphoreMutex lock) throws Exception {
            lock.acquire();
            log.info("lock3成功获取锁");
            lock4(lock);
            lock.release();
            log.info("lock3成功释放锁");
        }
    
        public void lock4(InterProcessSemaphoreMutex lock) throws Exception {
            log.info("lock4尝试获取锁");
            boolean result = lock.acquire(1000*2, TimeUnit.MILLISECONDS);
    
            if(result)
            {
                log.info("lock4成功获取锁");
                Thread.sleep(1000*10);
                lock.release();
                log.info("lock4成功释放锁");
            }
            else {
                log.info("lock4获取锁失败");
            }
        }
    
        public void buildReadTask(InterProcessMutex lock,String pre)
        {
            for(int i=0;i<5;i++)
            {
                Thread task = new Thread(()->{
    
                    try {
                        log.info("[{}]开始获取读锁",Thread.currentThread().getName());
                        lock.acquire();
                        log.info("[{}]获取读锁成功",Thread.currentThread().getName());
                        Thread.sleep(1000*5);
                        lock.release();
                        log.info("[{}]释放读锁",Thread.currentThread().getName());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                });
                task.setName(pre+"-mg-read-"+i);
                task.start();
            }
        }
    
        public void buildWriteTask(InterProcessMutex lock,String pre)
        {
            Thread task = new Thread(()->{
    
                try {
                    log.info("[{}]开始获取写锁",Thread.currentThread().getName());
                    lock.acquire();
                    log.info("[{}]获取写锁成功",Thread.currentThread().getName());
                    Thread.sleep(1000*5);
                    lock.release();
                    log.info("[{}]释放写锁",Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            });
            task.setName(pre+"-mg-wirte");
            task.start();
        }
    
    
        public static void main(String[] args) throws Exception {
            ZkLock zkLock = new ZkLock();
            ZkLockTest test = new ZkLockTest();
            // 可重入锁
            InterProcessMutex mutexLock = zkLock.getLock("demo");
            test.lock1(mutexLock);
            // 不可重入锁
            InterProcessSemaphoreMutex semaphoreMutexLock = zkLock.getSemaphoreLock("demo");
            test.lock3(semaphoreMutexLock);
            // 读写锁
            InterProcessReadWriteLock lock = zkLock.getReadWriteLock("demo");
            test.buildWriteTask(lock.writeLock(),"before");
            Thread.sleep(1000*2);
            test.buildReadTask(lock.readLock(),"before");
            Thread.sleep(1000*5);
            test.buildWriteTask(lock.writeLock(),"after");
        }
    }
    

    相关文章

      网友评论

          本文标题:Curator框架实现ZooKeeper分布式锁

          本文链接:https://www.haomeiwen.com/subject/zkruaktx.html