美文网首页个人学习
zookeeper 分布式锁

zookeeper 分布式锁

作者: 香沙小熊 | 来源:发表于2020-02-23 16:54 被阅读0次

1.步骤

  1. 每个客户端往持久化节点/parent_lock下创建有序临时节点/parent_lock/lock_。创建成功后/parent_lock下面会有每个客户端对应的节点,如/parent_lock/lock_000000001
  2. 客户端取得/parent_lock下子节点,并进行排序,判断排在最前面的是否为自己。
  3. 如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
  4. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
  5. 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
  6. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

2.实例

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class ZKLockUtil {

    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    //ZooKeeper配置信息
    private ZooKeeper zkClient;
    private String lockPath;


    // 监控lockPath的前一个节点的watcher
    private Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event.getPath() + " 前锁释放");
            System.out.println(event.getState() + " 前锁释放");
            synchronized (this) {
                notifyAll();
            }
        }
    };

    public ZKLockUtil() throws IOException {
        zkClient = new ZooKeeper("123.57.251.156:2181,123.57.251.156:2182,123.57.251.156:2183", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("失去连接");

                }
            }
        });
    }

    //获取锁的原语实现.
    public void acquireLock() throws InterruptedException, KeeperException {
        //创建锁节点
        createLock();
        //尝试获取锁
        attemptLock();
    }

    //创建锁的原语实现。在lock节点下创建该线程的锁节点
    private void createLock() throws KeeperException, InterruptedException {
        //如果根节点不存在,则创建根节点
        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        // 创建EPHEMERAL_SEQUENTIAL类型节点
        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
        this.lockPath = lockPath;
    }

    private void attemptLock() throws KeeperException, InterruptedException {
        // 获取Lock所有子节点,按照节点序号排序
        List<String> lockPaths = null;

        lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);

        Collections.sort(lockPaths);

        for (String s :
        lockPaths ) {
            System.out.println("lockPaths:    "+s);
        }

        System.out.println("当前最小节点" + lockPath.substring   (LOCK_ROOT_PATH.length() + 1));
        int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));

        // 如果lockPath是序号最小的节点,则获取锁
        if (index == 0) {
            System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
            return;
        } else {
            // lockPath不是序号最小的节点,监控前一个节点
            String preLockPath = lockPaths.get(index - 1);

            Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

            // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
            if (stat == null) {
                attemptLock();
            } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
                System.out.println(" 等待前锁释放,prelocakPath:" + preLockPath);
                synchronized (watcher) {
                    watcher.wait();
                }
                attemptLock();
            }
        }
    }

    //释放锁的原语实现
    public void releaseLock() throws KeeperException, InterruptedException {
        zkClient.delete(lockPath, -1);
        zkClient.close();
        System.out.println(" 锁释放:" + lockPath);
    }
}

public class TicketSeller {
    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {

        TicketSeller ticketSeller = new TicketSeller();

        for (int i = 0; i < 8; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // System.out.println("开始执行第"+i+"个");
                    try {
                        ticketSeller.sellTicketWithLock();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("-------------------------");
                }
            }).start();
        }


    }

    private void sell() {
        System.out.println("售票开始");
        // 线程随机休眠数毫秒,模拟现实中的费时操作
        int sleepMillis = (int) (Math.random() * 2000);
        try {
            //代表复杂逻辑执行了一段时间
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("售票结束");
    }

    public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
        ZKLockUtil lock = new ZKLockUtil();
        lock.acquireLock();
        sell();
        lock.releaseLock();
    }
}
分布式锁 优点 缺点
Zookeeper 1.有封装好的框架,容易实现
2.有等待锁的队列,大大提升抢锁效率。
添加和删除节点性能较低
Redis Set和Del指令性能较高 1.实现复杂,需要考虑超时,原子性,误删等情形。
2.没有等待锁的队列,只能在客户端自旋来等待,效率低下。

3.总结

其实如果有客户端C、客户端D等N个客户端争抢一个zk分布式锁,原理都是类似的。
大家都是上来直接创建一个锁节点下的一个接一个的临时顺序节点
如果自己不是第一个节点,就对自己上一个节点加监听器
只要上一个节点释放锁,自己就排到前面去了,相当于是一个排队机制。
而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。

特别感谢:

稀有气体

相关文章

网友评论

    本文标题:zookeeper 分布式锁

    本文链接:https://www.haomeiwen.com/subject/ynwjqhtx.html