美文网首页
zookeeper分布式锁demo(v1)

zookeeper分布式锁demo(v1)

作者: rock_fish | 来源:发表于2021-11-25 16:24 被阅读0次

    zk客户端依赖

    <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.8.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.5.7</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>4.3.0</version>
            </dependency>
    

    DIY ZK 分布式锁

    package com.rock.case2;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * zk 分布式锁 v1版本:
     * 完成功能 :
     *      1. 避免了惊群效应
     * 缺失功能:
     *      1. 超时控制
     *      2. 读写锁
     *      3. 重入控制
     */
    public class DistributedLock {
    
        private String connectString;
        private int sessionTimeout;
        private ZooKeeper zk;
    
        private CountDownLatch connectLatch = new CountDownLatch(1);
        private CountDownLatch waitLatch = new CountDownLatch(1);
    
        private String waitPath;
        private String currentNode;
        private String LOCK_ROOT_PATH;
    
        private static String NODE_PREFIX = "w";
    
        public DistributedLock(String connectString, int sessionTimeout, String lockName) {
            //TODO:数据校验
            this.connectString = connectString;
            this.sessionTimeout = sessionTimeout;
            this.LOCK_ROOT_PATH = lockName;
        }
    
    
        public void init() throws IOException, KeeperException, InterruptedException {
            // 建联
            zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
                // connectLatch  连接上zk后  释放
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
            });
    
            connectLatch.await();// 等待zk正常连接后
    
            // 判断锁名称节点是否存在
            Stat stat = zk.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                // 创建一下锁名称节点
                try {
                    zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (KeeperException e) {
                    //并发创建冲突忽略。
                    if (!e.code().name().equals("NODEEXISTS")) {
                        throw e;
                    }
                }
            }
        }
    
        /**
         * 待补充功能:
         * 1. 超时设置
         * 2. 读写区分
         * 3. 重入控制
         */
        public void zklock() throws KeeperException, InterruptedException {
            if (!tryLock()) {
                waitLock();
                zklock();
            }
        }
    
        /**
         *
         */
        private void waitLock() throws KeeperException, InterruptedException {
            try {
                zk.getData(waitPath, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        // waitLatch  需要释放
                        if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                            waitLatch.countDown();
                        }
                    }
                }, new Stat());
                // 等待监听
                waitLatch.await();
            } catch (KeeperException.NoNodeException e) {
                //如果等待的节点已经被清除了,不等了,再尝试去抢锁
                return;
            }
    
        }
    
        private boolean tryLock() throws KeeperException, InterruptedException {
    
            currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
            List<String> children = zk.getChildren(LOCK_ROOT_PATH, false);
            // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                return true;
            } else {
                String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
                // 通过w00000000获取该节点在children集合的位置
                int index = children.indexOf(thisNode);
                if (index == 0) {
                    //自己就是第一个节点
                    return true;
                }
                // 需要监听  他前一个节点变化
                waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
            }
            return false;
        }
    
    
        // 解锁
        public void unZkLock() {
            // 删除节点
            try {
                zk.delete(this.currentNode, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    
    
    使用测试
    package com.rock.case2;
    
    import org.apache.zookeeper.KeeperException;
    
    import java.io.IOException;
    
    public class DistributedLockTest {
    
        public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
    
           final  DistributedLock lock1 = new DistributedLock("xxx:2181",10000,"/rocklocks");
    
           final  DistributedLock lock2 = new DistributedLock("xxx:2181",10000,"/rocklocks");
    
           new Thread(new Runnable() {
               @Override
               public void run() {
                   try {
                       lock1.init();
    
                       for(int i =0;i<10;i++) {
                           lock1.zklock();
                           System.out.println("线程1 启动,获取到锁");
                           Thread.sleep(5 * 1000);
    
                           lock1.unZkLock();
                           System.out.println("线程1 释放锁");
                       }
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   } catch (IOException e) {
                       e.printStackTrace();
                   } catch (KeeperException e) {
                       e.printStackTrace();
                   }
               }
           }).start();
    
           new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        for(int i =0;i<10;i++) {
                            lock2.init();
                            lock2.zklock();
                            System.out.println("线程2 启动,获取到锁");
                            Thread.sleep(5 * 1000);
    
                            lock2.unZkLock();
                            System.out.println("线程2 释放锁");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    
    

    curator 自带分布式锁

    package com.atguigu.case3;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class CuratorLockTest {
    
        public static void main(String[] args) {
    
            // 创建分布式锁1
            InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
    
            // 创建分布式锁2
            InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock1.acquire();
                        System.out.println("线程1 获取到锁");
    
                        lock1.acquire();
                        System.out.println("线程1 再次获取到锁");
    
                        Thread.sleep(5 * 1000);
    
                        lock1.release();
                        System.out.println("线程1 释放锁");
    
                        lock1.release();
                        System.out.println("线程1  再次释放锁");
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock2.acquire();
                        System.out.println("线程2 获取到锁");
    
                        lock2.acquire();
                        System.out.println("线程2 再次获取到锁");
    
                        Thread.sleep(5 * 1000);
    
                        lock2.release();
                        System.out.println("线程2 释放锁");
    
                        lock2.release();
                        System.out.println("线程2  再次释放锁");
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
        private static CuratorFramework getCuratorFramework() {
    
            ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
                    .connectionTimeoutMs(2000)
                    .sessionTimeoutMs(2000)
                    .retryPolicy(policy).build();
    
            // 启动客户端
            client.start();
    
            System.out.println("zookeeper 启动成功");
            return client;
        }
    }
    
    

    参考资料


    zookeeper实现分布式锁和配置中心
    【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析
    分布式锁(一)__基于Zookeeper实现可重入分布式锁

    相关文章

      网友评论

          本文标题:zookeeper分布式锁demo(v1)

          本文链接:https://www.haomeiwen.com/subject/xuontrtx.html