美文网首页
zookeeper分布式锁

zookeeper分布式锁

作者: 紫色红色黑色 | 来源:发表于2019-12-11 23:53 被阅读0次

描述

zookeeper实现分布式锁原理:多个客户端去节点下分别创建临时顺序的节点。按顺序最小的节点对应的客户端获取锁。没有获取锁的客户端对前一个节点增加监听节点删除事件。释放锁时将节点删除,此时服务器发送监听事件通知客户端去获取锁。

思考

1.分布式锁包含本地锁和共享资源锁。例如一个客户端多个线程去获取分布式锁,应该先用本地锁控制,在放一个线程去获取共享资源锁。
2.zookeeper移除watch,客户端通过removeWatches()实现。入参需要指定WatcherType。

Watcher.WatcherType:Children,Data,Any
通过getChildren()添加的watch,使用WatcherType是Children
通过exists()/getData()添加的watch,使用WatcherType是Data

代码实现

下面的分布式锁,没有本地锁。


public class ZkDistributedLock {

    private static final String lockPrefix = "/update";
    private static final int retryTime = 3;

    private ZooKeeper zooKeeper;

    private String znodeRoot;

    private AtomicReference<String> znodeLock = new AtomicReference<>(null);


    public ZkDistributedLock(ZooKeeper zooKeeper, String znodeRoot) {
        this.znodeRoot = znodeRoot;
        this.zooKeeper = zooKeeper;

        try {
            if (zooKeeper.exists(this.znodeRoot, false) == null) {
                zooKeeper.create(znodeRoot, znodeRoot.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void lock() throws Exception {
        String myLock = zooKeeper.create(znodeRoot + lockPrefix, lockPrefix.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        if (tryAcquire(myLock) != null) {
            System.out.println(Thread.currentThread().getName() + " lock " + myLock);
        }
    }

    public String tryAcquire(String myZnode) throws Exception {
        while (true) {

            List<String> children = zooKeeper.getChildren(znodeRoot, false);
            Collections.sort(children);

            String myLockString = myZnode.substring(8);

            if (myLockString.equals(children.get(0))) {
                znodeLock.set(myZnode);
                break;
            } else {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                int i = children.indexOf(myLockString);

                Stat exists = zooKeeper.exists(znodeRoot + "/" + children.get(i - 1), event -> {
                    if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
                        countDownLatch.countDown();
                    }
                });

                // 如果watch的节点已经删除,则移除watch和countDownLatch,继续获取锁
                if (exists == null) {
                    /**
                     * 移除watch
                     * Watcher.WatcherType:Children,Data,Any
                     * 通过getChildren()添加的watch,使用WatcherType是Children
                     * 通过exists()/getData()添加的watch,使用WatcherType是Data
                     *
                     * https://colobu.com/2014/09/28/ZooKeeper-Programmers-Guide/#Remove_Watches
                     */
                    zooKeeper.removeAllWatches(znodeRoot + "/" + children.get(i - 1), Watcher.WatcherType.Data, true);
                    countDownLatch.countDown();
                }
                System.out.println(Thread.currentThread().getName() + "_znode:" + myZnode + "_wait:" + children.get(i - 1));
                countDownLatch.await();
            }
        }
        return znodeLock.get();
    }

    public void unLock() {
        int actualTime = 0;
        do {
            try {
                zooKeeper.delete(znodeLock.get(), -1);
                System.out.println(Thread.currentThread().getName() + " unlock " + znodeLock.get());
                znodeLock.set(null);

                break;
            } catch (Exception e) {
                if (++actualTime == retryTime) {
                    e.printStackTrace();
                }
            }
        } while (actualTime < retryTime);
    }

}

测试

public static void main(String[] args) throws Exception {
        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        ZkCli zkCli = new ZkCli(connectString);
        ZooKeeper zooKeeper = zkCli.getZooKeeper();
        ZkDistributedLock lock = new ZkDistributedLock(zooKeeper, "/myLock");

        ExecutorService pool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {

            pool.execute(() -> {
                try {
                    lock.lock();
//                    Thread.sleep(1000);
                    System.out.println("do some thing");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unLock();
                }
            });
        }

        pool.shutdown();
    }

引用

http://www.dengshenyu.com/java/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/10/23/zookeeper-distributed-lock.html
https://colobu.com/2014/09/28/ZooKeeper-Programmers-Guide/#Remove_Watches
https://juejin.im/post/5bbb0d8df265da0abd3533a5
https://www.cnblogs.com/cc11001100/p/10269494.html

相关文章

网友评论

      本文标题:zookeeper分布式锁

      本文链接:https://www.haomeiwen.com/subject/tbqzgctx.html