美文网首页个人学习
zookeeper 分布式锁

zookeeper 分布式锁

作者: 香沙小熊 | 来源:发表于2020-02-23 16:54 被阅读0次

    1.步骤

    1. 每个客户端往持久化节点/parent_lock下创建有序临时节点/parent_lock/lock_。创建成功后/parent_lock下面会有每个客户端对应的节点,如/parent_lock/lock_000000001
    2. 客户端取得/parent_lock下子节点,并进行排序,判断排在最前面的是否为自己。
    3. 如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
    4. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
    5. 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
    6. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

    2.实例

    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    
    public class ZKLockUtil {
    
        private static final String LOCK_ROOT_PATH = "/Locks";
        private static final String LOCK_NODE_NAME = "Lock_";
        //ZooKeeper配置信息
        private ZooKeeper zkClient;
        private String lockPath;
    
    
        // 监控lockPath的前一个节点的watcher
        private Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getPath() + " 前锁释放");
                System.out.println(event.getState() + " 前锁释放");
                synchronized (this) {
                    notifyAll();
                }
            }
        };
    
        public ZKLockUtil() throws IOException {
            zkClient = new ZooKeeper("123.57.251.156:2181,123.57.251.156:2182,123.57.251.156:2183", 10000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("失去连接");
    
                    }
                }
            });
        }
    
        //获取锁的原语实现.
        public void acquireLock() throws InterruptedException, KeeperException {
            //创建锁节点
            createLock();
            //尝试获取锁
            attemptLock();
        }
    
        //创建锁的原语实现。在lock节点下创建该线程的锁节点
        private void createLock() throws KeeperException, InterruptedException {
            //如果根节点不存在,则创建根节点
            Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
    
            // 创建EPHEMERAL_SEQUENTIAL类型节点
            String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                    Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
            this.lockPath = lockPath;
        }
    
        private void attemptLock() throws KeeperException, InterruptedException {
            // 获取Lock所有子节点,按照节点序号排序
            List<String> lockPaths = null;
    
            lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
    
            Collections.sort(lockPaths);
    
            for (String s :
            lockPaths ) {
                System.out.println("lockPaths:    "+s);
            }
    
            System.out.println("当前最小节点" + lockPath.substring   (LOCK_ROOT_PATH.length() + 1));
            int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    
            // 如果lockPath是序号最小的节点,则获取锁
            if (index == 0) {
                System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
                return;
            } else {
                // lockPath不是序号最小的节点,监控前一个节点
                String preLockPath = lockPaths.get(index - 1);
    
                Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
    
                // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
                if (stat == null) {
                    attemptLock();
                } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
                    System.out.println(" 等待前锁释放,prelocakPath:" + preLockPath);
                    synchronized (watcher) {
                        watcher.wait();
                    }
                    attemptLock();
                }
            }
        }
    
        //释放锁的原语实现
        public void releaseLock() throws KeeperException, InterruptedException {
            zkClient.delete(lockPath, -1);
            zkClient.close();
            System.out.println(" 锁释放:" + lockPath);
        }
    }
    
    
    public class TicketSeller {
        public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
    
            TicketSeller ticketSeller = new TicketSeller();
    
            for (int i = 0; i < 8; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        // System.out.println("开始执行第"+i+"个");
                        try {
                            ticketSeller.sellTicketWithLock();
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        System.out.println("-------------------------");
                    }
                }).start();
            }
    
    
        }
    
        private void sell() {
            System.out.println("售票开始");
            // 线程随机休眠数毫秒,模拟现实中的费时操作
            int sleepMillis = (int) (Math.random() * 2000);
            try {
                //代表复杂逻辑执行了一段时间
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("售票结束");
        }
    
        public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
            ZKLockUtil lock = new ZKLockUtil();
            lock.acquireLock();
            sell();
            lock.releaseLock();
        }
    }
    
    分布式锁 优点 缺点
    Zookeeper 1.有封装好的框架,容易实现
    2.有等待锁的队列,大大提升抢锁效率。
    添加和删除节点性能较低
    Redis Set和Del指令性能较高 1.实现复杂,需要考虑超时,原子性,误删等情形。
    2.没有等待锁的队列,只能在客户端自旋来等待,效率低下。

    3.总结

    其实如果有客户端C、客户端D等N个客户端争抢一个zk分布式锁,原理都是类似的。
    大家都是上来直接创建一个锁节点下的一个接一个的临时顺序节点
    如果自己不是第一个节点,就对自己上一个节点加监听器
    只要上一个节点释放锁,自己就排到前面去了,相当于是一个排队机制。
    而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。

    特别感谢:

    稀有气体

    相关文章

      网友评论

        本文标题:zookeeper 分布式锁

        本文链接:https://www.haomeiwen.com/subject/ynwjqhtx.html