美文网首页Java学习笔记程序员
Zookeeper实现分布式锁

Zookeeper实现分布式锁

作者: 一只小哈 | 来源:发表于2017-02-06 09:23 被阅读15716次

    前几天分析了一下三种分布式锁的实现,但是没有利用zookeeper实现一个分布式锁,因为感觉基于Zookeeper实现分布式锁还是稍微复杂的,同时也需要使用Watcher机制,所以就单独搞一篇Zookeeper实现的分布式锁。

    首先,第一种实现。我们可以利用Zookeeper不能重复创建一个节点的特性来实现一个分布式锁,这看起来和redis实现分布式锁很像。但是也是有差异的,后面会详细分析。
    主要流程图如下:

    Paste_Image.png

    上面的流程很简单:

    1. 查看目标Node是否已经创建,已经创建,那么等待锁。
    2. 如果未创建,创建一个瞬时Node,表示已经占有锁。
    3. 如果创建失败,那么证明锁已经被其他线程占有了,那么同样等待锁。
    4. 当释放锁,或者当前Session超时的时候,节点被删除,唤醒之前等待锁的线程去争抢锁。

    上面是一个完整的流程,简单的代码实现如下:

    package com.codertom.params.engine;
    
    import com.google.common.base.Strings;
    import org.apache.zookeeper.*;
    import java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Lock;
    
    /**
     * Zookeepr实现分布式锁
     */
    public class LockTest {
    
        private String zkQurom = "localhost:2181";
    
        private String lockNameSpace = "/mylock";
    
        private String nodeString = lockNameSpace + "/test1";
    
        private Lock mainLock;
    
        private ZooKeeper zk;
    
        public LockTest(){
            try {
                zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("Receive event "+watchedEvent);
                        if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                            System.out.println("connection is established...");
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
        }
    
        private void ensureRootPath() throws InterruptedException {
            try {
                if (zk.exists(lockNameSpace,true)==null){
                    zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        private void watchNode(String nodeString, final Thread thread) throws InterruptedException {
            try {
                zk.exists(nodeString, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println( "==" + watchedEvent.toString());
                        if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                            System.out.println("Threre is a Thread released Lock==============");
                            thread.interrupt();
                        }
                        try {
                            zk.exists(nodeString,new Watcher() {
                                @Override
                                public void process(WatchedEvent watchedEvent) {
                                    System.out.println( "==" + watchedEvent.toString());
                                    if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                        System.out.println("Threre is a Thread released Lock==============");
                                        thread.interrupt();
                                    }
                                    try {
                                        zk.exists(nodeString,true);
                                    } catch (KeeperException e) {
                                        e.printStackTrace();
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }
    
                            });
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                });
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取锁
         * @return
         * @throws InterruptedException
         */
        public boolean lock() throws InterruptedException {
            String path = null;
            ensureRootPath();
            watchNode(nodeString,Thread.currentThread());
            while (true) {
                try {
                    path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (KeeperException e) {
                    System.out.println(Thread.currentThread().getName() + "  getting Lock but can not get");
                    try {
                        Thread.sleep(5000);
                    }catch (InterruptedException ex){
                        System.out.println("thread is notify");
                    }
                }
                if (!Strings.nullToEmpty(path).trim().isEmpty()) {
                    System.out.println(Thread.currentThread().getName() + "  get Lock...");
                    return true;
                }
            }
        }
    
        /**
         * 释放锁
         */
        public void unlock(){
            try {
                zk.delete(nodeString,-1);
                System.out.println("Thread.currentThread().getName() +  release Lock...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String args[]) throws InterruptedException {
            ExecutorService service = Executors.newFixedThreadPool(10);
            for (int i = 0;i<4;i++){
                service.execute(()-> {
                    LockTest test = new LockTest();
                    try {
                        test.lock();
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.unlock();
                });
            }
            service.shutdown();
        }
    }
    
    

    代码比较糙,但是大致的实现思路和上述一致,这里需要注意:

    1. 因为使用的是原生的Zookeeper API实现,Watch需要重复的设置,所以代码复杂的些。
    2. 唤醒直接用的Thread.interupt这样其实控制流程其实是不好的。

    其实上面的实现有优点也有缺点:
    优点:
    实现比较简单,有通知机制,能提供较快的响应,有点类似reentrantlock的思想,对于节点删除失败的场景由Session超时保证节点能够删除掉。
    缺点:
    重量级,同时在大量锁的情况下会有“惊群”的问题。

    “惊群”就是在一个节点删除的时候,大量对这个节点的删除动作有订阅Watcher的线程会进行回调,这对Zk集群是十分不利的。所以需要避免这种现象的发生。

    解决“惊群”:

    为了解决“惊群“问题,我们需要放弃订阅一个节点的策略,那么怎么做呢?

    1. 我们将锁抽象成目录,多个线程在此目录下创建瞬时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
    2. 首先创建顺序节点,然后获取当前目录下最小的节点,判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
    3. 获取锁失败的节点获取当前节点上一个顺序节点,对此节点注册监听,当节点删除的时候通知当前节点。
    4. 当unlock的时候删除节点之后会通知下一个节点。

    上面的实现和reentrantlock的公平锁实现还是比较类似的,下面是简单的实现:

    package com.codertom.params.engine;
    
    import com.google.common.base.Strings;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by zhiming on 2017-02-05.
     */
    public class FairLockTest {
    
        private String zkQurom = "localhost:2181";
    
        private String lockName = "/mylock";
    
        private String lockZnode = null;
    
        private ZooKeeper zk;
    
        public FairLockTest(){
            try {
                zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("Receive event "+watchedEvent);
                        if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                            System.out.println("connection is established...");
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
        }
    
        private void ensureRootPath(){
            try {
                if (zk.exists(lockName,true)==null){
                    zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        /**
         * 获取锁
         * @return
         * @throws InterruptedException
         */
        public void lock(){
            String path = null;
            ensureRootPath();
                try {
                    path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    lockZnode = path;
                    List<String> minPath = zk.getChildren(lockName,false);
                    System.out.println(minPath);
                    Collections.sort(minPath);
                    System.out.println(minPath.get(0)+" and path "+path);
                    if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
                        System.out.println(Thread.currentThread().getName() + "  get Lock...");
                        return;
                    }
                    String watchNode = null;
                    for (int i=minPath.size()-1;i>=0;i--){
                        if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
                            watchNode = minPath.get(i);
                            break;
                        }
                    }
    
                    if (watchNode!=null){
                        final String watchNodeTmp = watchNode;
                        final Thread thread = Thread.currentThread();
                        Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
                            @Override
                            public void process(WatchedEvent watchedEvent) {
                                if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                    thread.interrupt();
                                }
                                try {
                                    zk.exists(lockName + "/" + watchNodeTmp,true);
                                } catch (KeeperException e) {
                                    e.printStackTrace();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
    
                        });
                        if(stat != null){
                            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
                        }
                    }
                    try {
                        Thread.sleep(1000000000);
                    }catch (InterruptedException ex){
                        System.out.println(Thread.currentThread().getName() + " notify");
                        System.out.println(Thread.currentThread().getName() + "  get Lock...");
                        return;
                    }
    
                } catch (Exception e) {
                   e.printStackTrace();
                }
        }
    
        /**
         * 释放锁
         */
        public void unlock(){
            try {
                System.out.println(Thread.currentThread().getName() +  "release Lock...");
                zk.delete(lockZnode,-1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
    
    
        public static void main(String args[]) throws InterruptedException {
            ExecutorService service = Executors.newFixedThreadPool(10);
            for (int i = 0;i<4;i++){
                service.execute(()-> {
                    FairLockTest test = new FairLockTest();
                    try {
                        test.lock();
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.unlock();
                });
            }
            service.shutdown();
        }
    
    }
    
    

    同样上面的程序也有几点需要注意:

    1. Zookeeper的API没有提供直接的获取上一个节点或者最小节点的API需要我们自己实现。
    2. 使用了interrupt做线程的唤醒,这样不科学,因为不想将JVM的lock引进来所以没有用countdownlatch来做流程控制。
    3. Watch也是要重新设置的,这里使用了Watch的复用,所以代码简单些。

    其实上面的实现还是很复杂的,因为你需要反复的去关注Watcher,实现一个Demo可以,做一个生产环境可用的Lock并不容易。因为你的代码bug在生产环境上会引起很严重的bug。

    其实对于Zookeeper的一些常用功能是有一些成熟的包实现的,像Curator。Curator的确是足够牛逼,不仅封装了Zookeeper的常用API,也包装了很多常用Case的实现。但是它的编程风格其实还是吧比较难以接受的。

    可以用Curator轻易的实现一个分布式锁:

    InterProcessMutex lock = new InterProcessMutex(client, lockPath);
    if ( lock.acquire(maxWait, waitUnit) ) 
    {
        try 
        {
            // do some work inside of the critical section here
        }
        finally
        {
            lock.release();
        }
    }
    

    是的就这么简单,一个直接拿过来可用的轮子。

    基于Zookeeper的分布式锁就说完了。基于Zookeeper实现分布式锁,其实是不常用的。虽然它实现锁十分优雅,但编程复杂,同时还要单独维护一套Zookeeper集群,频繁的Watch对Zookeeper集群的压力还是蛮大的,如果不是原有的项目以来Zookeeper,同时锁的量级比较小的话,还是不用为妙。

    相关文章

      网友评论

        本文标题:Zookeeper实现分布式锁

        本文链接:https://www.haomeiwen.com/subject/vbnbbttx.html