美文网首页
使用Zookeeper的Watcher实现分布式锁

使用Zookeeper的Watcher实现分布式锁

作者: Wannay | 来源:发表于2021-04-06 22:15 被阅读0次
    package com.wanna.zk.zkstudy;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * 利用zk实现分布式锁思路:使用临时顺序节点
     * 1.在执行加锁之前,先判断/LOCKS有没有被创建,如果没有则先创建
     * 2.如果创建了/LOCKS,则创建/LOCKS/LOCK_xxxxxxxx这样子的临时顺序节点
     * 3.尝试去加锁的逻辑,先获取/LOCKS/下所有的孩子节点,并进行排序
     * ---3.1如果当前锁在的位置是0,也就是第一个元素,那么就获取锁成功,开始执行业务代码
     * ---3.2如果当前锁所在的位置不是0,那么就获取它的前一个节点,并进行监控
     * ------如果前一个节点已经被删了,那么就继续尝试去获取锁
     * ------如果前一个节点还没被删,那么就阻塞当前线程,直到监听器(Watcher)对象将其唤醒才继续执行
     * 4.解锁逻辑,删除自己创建的LOCK这条记录,因为是临时顺序节点,因此就算机器宕机了也会自动删除锁
     */
    public class ZKLock {
        private static final String LOCK_ROOT = "/LOCKS";   //锁的根路径
        private static final String LOCK_NODE_NAME = "LOCK_";  //锁的名称,使用临时顺序节点
        private String lockPath;  //完整的锁路径
        //集群的连接字符串,机器ip:port中间用逗号分隔即可
        String connectString = "localhost:2181,localhost:2182,localhost:2183";  
        private ZooKeeper zooKeeper;  //zk对象
        private final Thread currentThread = Thread.currentThread();  //获取创建对象的线程,用来进行唤醒和阻塞
        private int zkSessionTimeout = 5000;  //sessionTimeout
    
        //判断某个元素是否被删除,如果删除了就将当前线程唤醒
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                //如果获取到删除节点的事件,那么就唤醒当前线程
                if (event.getType() == Event.EventType.NodeDeleted) {
                    LockSupport.unpark(currentThread);  //将当前线程唤醒
                }
            }
        };
    
        public ZKLock() {
            try {
                zooKeeper = new ZooKeeper(connectString, zkSessionTimeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.None) {
                            //如果链接建立成功,打印相关信息
                            if (event.getState() == Event.KeeperState.SyncConnected) {
                                System.out.println("ZK连接建立成功");
                                LockSupport.unpark(currentThread);
                            }
                        }
                    }
                });
                LockSupport.park();   //因为是异步去进行连接,因此这里需要等待
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void lock() {
            try {
                createLock();  //createLock
                attemptLock();  //尝试去获取锁
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void createLock() throws KeeperException, InterruptedException {
            //判断LOCKS节点是否存在
            final Stat exists = zooKeeper.exists(LOCK_ROOT, false);
            //如果不存在则创建一个/LOCKS的持久节点
            if (exists == null) {
                zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            //创建临时有序节点,节点为/LOCKS/LOCK_xxxxxxxx
            lockPath = zooKeeper.create(LOCK_ROOT.concat("/").concat(LOCK_NODE_NAME), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("LOCK NODE ".concat(lockPath).concat(" has been created"));
        }
    
        //尝试去获取锁
        public void attemptLock() throws KeeperException, InterruptedException {
            //获取/LOCKS节点的孩子节点
            final List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
            //对children节点进行排序
            Collections.sort(children);
            //获取当前锁节点排序之后的下标,截取掉/LOCKS/之后的内容
            final int index = children.indexOf(lockPath.substring(LOCK_ROOT.length() + 1));
            //如果获取到的index为0,也就是第一个元素
            if (index == 0) {
                System.out.println("获取LOCK成功");
                return;
            }
            //如果不是第一个元素,获取上一个节点的路径
            final String s = children.get(index - 1);
            //监视它的上一个元素的变化情况,传入watcher对象
            final Stat exists = zooKeeper.exists(LOCK_ROOT.concat("/").concat(s), watcher);
            //继续去获取锁
            if (exists != null) {  //如果exists不为null,那么就阻塞,不然就尝试去获取锁
                LockSupport.park();
            }
            attemptLock();
        }
    
        public void unlock() {
            //删除锁的临时有序节点,并且关闭连接对象
            try {
                zooKeeper.delete(lockPath, -1);
                zooKeeper.close();
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:使用Zookeeper的Watcher实现分布式锁

          本文链接:https://www.haomeiwen.com/subject/xvgekltx.html