美文网首页
Zookeeper实现分布式锁

Zookeeper实现分布式锁

作者: 名字是乱打的 | 来源:发表于2020-04-23 19:31 被阅读0次

    论文初稿搞定了,咱又有时间学习啦。瑟瑟发抖,还没接收到要开学的消息,啥时候能答辩啊。。。咱这不会被延期个把月毕业吧。好想去公司上班啊...但是省内学校不让开学也不让去实习...太稳健了吧...

    -------------------------------------废话分隔符-------------------------------------------

    最初看过Zookeeper实现分布式锁的设计思想,感觉类似于之前写的Redis链接
    ,都是通过只有一个客户端创建某个结点,只要结点存在其他的结点无法创建,实现一种资源的占用,后来又想想其实锁不就是这样嘛,也不是只是Zookeeper和Redis,就算是单进程各个线程之间的锁其实也是一种对象资源管控罢了。
    后面接触多了,发现其实Zookeeper和Redis的分布式锁还是不一样滴~


    一,先看一下有些部分类似于Redis的Zookeeper式分布式锁

    如图所示,我们三个客户端去监听同一节点,那么只有一个客户端比如A客户端能够创建成功,那么其余结点由于监听了节点事件,当A使用完毕更改了结点状态,其余节点也就知道可以开始再次抢占资源了。

    羊群效应:但是这样做有个缺点,由于众多结点监听该同一事件,那么每次发生节点变化虽然只有一个结点能够抢占资源,但是却惊动了整个群体,造成很多事件的变更,显然如果对于很多结点的分布式应用,这样并不划算。

    二,利用有序节点实现分布式锁

    如图所示,每个客户端都去锁结点下创建一个属于自己的有序临时结点,临时结点会监听一个比自己小的结点,如果自己是当前最小的,那么就代表你是可以被正常使用的(抢占资源)。
    这里以Java提供的Lock为基础进行扩展实现,并且以继承的方式加watcher机制。Zk操作使用ZK提供的原生Java API。

    分布式锁代码
    public class DistributedLock implements Lock, Watcher {
        private ZooKeeper zk=null;//创建zk客户端
        private String ROOT_LOCK="/locks";//定义一个根节点
        private String WAIT_LOCK;//等待前一个锁
        private String CURRENT_LOCK;//表示当前锁
        private CountDownLatch countDownLatch;
    
        public DistributedLock( ) {
            try {
                this.zk = new ZooKeeper("Ip:port", 4000, this);//这里watcher放在本类中实现了
                //判断根节点是否存在
                Stat stat=zk.exists(ROOT_LOCK,false);
                if (stat==null){
                    zk.create(ROOT_LOCK,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
    
        }
    
        @Override
        public boolean tryLock() {//当结点调用尝试获得锁方法时
            try {
                //创建临时有序结点并复制给当前锁
                CURRENT_LOCK=zk.create(ROOT_LOCK+"/","0".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//创建临时有序结点
                System.out.println(Thread.currentThread().getName()+"创建临时结点--->"+CURRENT_LOCK+"--->尝试获得锁");
                //获取孩子结点(锁结点)
                List<String> childrens = zk.getChildren(ROOT_LOCK, false);
                //创建孩子集合并添加到有序Set中
                SortedSet<String> nodes=new TreeSet<>();
                for (String children:childrens){
                    nodes.add(ROOT_LOCK+"/"+children);
                }
                String firstNode = nodes.first();
                //尝试获得比当前结点更小的结果集,以此来获取到最后一个最大的结点也就是本结点前一个结点
                //注意,这里不可以用nodes.last(),因为这样获取到的是全部结点最后一个结点,是大于本结点那个
                SortedSet<String> lessThenMe = ((TreeSet<String>)nodes).headSet(CURRENT_LOCK);
                if (CURRENT_LOCK.equals(firstNode)){//如果当前结点是集合中最小的结点则其获得锁
                    return true;
                }
                if (!lessThenMe.isEmpty()){//如果存在比当前结点更小的结点
                    WAIT_LOCK=lessThenMe.last();//获得该有序结合的最后一个结点,即前一个结点,设置给WAIT_LOCL
                    System.out.println("lessThenMe最后一个 : "+lessThenMe.last());
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @Override
        public void lock() {
            if (this.tryLock()){//如果获得锁成功
                System.out.println(Thread.currentThread().getName()+"获得锁成功");
                return;
            }
            //如果没有成功获得锁,那么等待前一个结点释放锁
            waitForLock(WAIT_LOCK);
        }
    
        private boolean waitForLock(String preNode){
            try {
                //监听上一个比自己小的结点
                Stat stat=zk.exists(preNode,true);
                if (stat!=null){
                    System.out.println(Thread.currentThread().getName()+"等待--->"+ROOT_LOCK+"/"+preNode+"释放锁");
                    countDownLatch=new CountDownLatch(1);
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName()+"--->获得锁");
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return true;
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public void unlock() {
            System.out.println(Thread.currentThread().getName()+"释放锁"+CURRENT_LOCK);
            try {
                zk.delete(CURRENT_LOCK,-1);//version=-1不管如何都会删除
                CURRENT_LOCK=null;
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            if (this.countDownLatch!=null){
                this.countDownLatch.countDown();
            }
        }
    }
    

    测试类

    public class ZkDistributedLockTest {
        public static void main(String[] args) throws IOException {
            CountDownLatch countDownLatch=new CountDownLatch(10);
            for(int i=0;i<10;i++){
                new Thread(()->{
                    try {
                        countDownLatch.await();
                        DistributedLock distributedLock=new DistributedLock();
                        distributedLock.lock();//当前线程尝试获得锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },"线程"+i).start();
                countDownLatch.countDown();
            }
            System.in.read();
        }
    }
    
    说一下思想:

    1.这里设定了一个倒计时CountDownLatch[1]用于控制和阻塞并发线程;
    2.我们让每个线程创建分布式锁工具类**DistributedLock,每个线程会连接到ZK服务器成为一个客户端,只有一个创建跟结点

    3.接下来每个线程都去尝试调用我们的lock()方法,我们的lock会去调用tryLock()方法
    4.首先,我们让每个线程在tryLock()里去创建一个临时顺序结点,跟线程名无关,先到者排序在前

    5.然后,我们会查出此时分布式锁根节点下的所有顺序结点,如果当前结点是最小的结点那么它将会获得锁,如果当前结点不是最小的结点,那么找到前一个结点并监听,并且利用countdownlatch设置倒计时为1,并阻塞当前线程[2]
    6.当前一个结点出现监听事件(删除,更改),我们释放countdownlatch,释放本线程

    如此,我们可以以分布式锁的形式实现Zk结点的顺序作用。


    上述提供的是一种自己实现watcher和lock利用原生zk API简单实现分布式锁,实际上curator提供了许多高度封装好的分布式锁简化了步骤,而且提供了更多细节操作,比如读写超市断开连接的选举等~ eg

    参考文献:
    [1] CountDownLauch解析.https://www.jianshu.com/p/a1a73ce99526
    [2]利用SortedSet实现结点排序,SortedSet.headSet(string key),可以取出小于key的结点,SortedSet.first获取第一个结点,SortedSet.last获取最后一个结点具体的https://blog.csdn.net/xjk201/article/details/81586209


    哈哈哈,写论文入魔了,搞个参考文献皮一下~

    相关文章

      网友评论

          本文标题:Zookeeper实现分布式锁

          本文链接:https://www.haomeiwen.com/subject/yngvmhtx.html