美文网首页
(10)分布式锁的手动实现

(10)分布式锁的手动实现

作者: Mrsunup | 来源:发表于2018-11-26 22:04 被阅读0次

    基本实现思路为基于zookeeper的本身的临时有序节点特性来实现分布式锁,如下图所示

    每个客户端都往zookeeper上创建一个临时有序节点,注册完后,取/locks下所有节点中最小的节点,则创建该最小节点的客户端拿到锁。如果当前最小节点是/lock_seq1,那么/lock_seq2边监听/lock_seq1边阻塞等待,当/lock_seq1被删除后,/lock_seq2节点的客户端获得锁,以此类推。
    每个节点监听比自己小一个数的临时节点,当监听的节点被删除的(此时认为是释放锁),监听事件被触发

    1.代码演示

    /**
     * @Project: 3.DistributedProject
     * @description:  手动实现分布式锁
     * @author: sunkang
     * @create: 2018-06-24 08:19
     * @ModificationHistory who      when       What
     **/
    public class DistributedLocks implements Lock,Watcher {
        private ZooKeeper zooKeeper = null;
        private String ROOT_LOCK ="/locks";
        private String CURRENT_LOCK ;
        private String WAIT_LOCK;
        private CountDownLatch  countDownLatch;
    
        //连接才开始操作的
        private CountDownLatch sysConectDownLatch;
    
        public DistributedLocks() {
            try {
                //建立zookeeper的连接
                sysConectDownLatch = new CountDownLatch(1);
                zooKeeper = new ZooKeeper("192.168.44.129:2181",40000,this);
                sysConectDownLatch.await();
                //是否存在根节点,如果不存在,则去创建持久化节点
              Stat stat = zooKeeper.exists(ROOT_LOCK,false);
              if(stat == null){
                  zooKeeper.create(ROOT_LOCK,"0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
              }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void lock() {
            if(tryLock()){
                System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->获得锁成功");
                return ;
            }
            //添加监控
            waitForLocks(WAIT_LOCK);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
        }
    
        //尝试获取锁
        @Override
        public boolean tryLock() {
            try {
                //创建当前的节点
                CURRENT_LOCK = zooKeeper.create(ROOT_LOCK+"/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                //获取当前的所有的子节点
                System.out.println(Thread.currentThread().getName()+"->"+ CURRENT_LOCK+",尝试竞争锁");
                List<String> childrens =  zooKeeper.getChildren(ROOT_LOCK,false);
                SortedSet<String> treeSet =new  TreeSet<String>();
                for(String children : childrens){
                    treeSet.add(ROOT_LOCK+"/"+children);
                }
                //获取最小的节点
                String minNode = treeSet.first();
                if(CURRENT_LOCK.equals(minNode)){
                    return true;
                }
                //获取当前节点的上一个节点
               SortedSet<String> lessCurrentSets=  treeSet.headSet(CURRENT_LOCK);
                if(lessCurrentSets != null){
                    WAIT_LOCK = lessCurrentSets.last();
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        //对上一节点记性监控
        private boolean waitForLocks(String pre){
            try {
                //进行监控
                Stat stat=  zooKeeper.exists(pre,true);
                if(stat!=null){
                    System.out.println("当前线程"+Thread.currentThread().getName()+"等待"+WAIT_LOCK+"释放");
                     countDownLatch = new CountDownLatch(1);
                    countDownLatch.await();
                    System.out.println("当前线程"+Thread.currentThread().getName()+"获得锁");
                    return true;
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @Override
        public void unlock() {
            System.out.println(Thread.currentThread().getName()+ "->释放锁"+ CURRENT_LOCK);
            try {
                zooKeeper.delete(CURRENT_LOCK,-1);
                CURRENT_LOCK=null;
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            if( watchedEvent.getState().equals(Event.KeeperState.SyncConnected)){
                sysConectDownLatch.countDown();//说明已经建立了连接
            }
            //当节点的信息发送删除的时候就会触发该监控
            if(this.countDownLatch != null){
                countDownLatch.countDown();
            }
        }
    }
    

    2.测试演示

    • 测试代码,启动是个线程,模仿是个客户端同时访问zookeeper
    /**
     * @Project: 3.DistributedProject
     * @description: 测试
     * @author: sunkang
     * @create: 2018-06-24 08:44
     * @ModificationHistory who      when       What
     **/
    public class LocksDemo {
        public static void main(String[] args) throws IOException {
            final CountDownLatch countDownLatch = new CountDownLatch(10);
            for(int i=0;i<10;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        DistributedLocks lock = null;
                        try {
                            countDownLatch.await();
                            lock = new DistributedLocks();
                            lock.lock(); //获得锁
                            //todo  做一些事情
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            lock.unlock();
                        }
                    }
                },"Thread-"+i).start();
                countDownLatch.countDown();
            }
        }
    }
    
    • 观察结果 :可以看到每2秒之后,或有线程获取锁
    Thread-1->/locks/0000000087,尝试竞争锁
    Thread-7->/locks/0000000088,尝试竞争锁
    Thread-4->/locks/0000000089,尝试竞争锁
    Thread-8->/locks/0000000091,尝试竞争锁
    Thread-3->/locks/0000000090,尝试竞争锁
    Thread-2->/locks/0000000092,尝试竞争锁
    Thread-9->/locks/0000000095,尝试竞争锁
    Thread-5->/locks/0000000094,尝试竞争锁
    Thread-6->/locks/0000000096,尝试竞争锁
    Thread-0->/locks/0000000093,尝试竞争锁
    Thread-1->/locks/0000000087->获得锁成功
    当前线程Thread-2等待/locks/0000000091释放
    当前线程Thread-8等待/locks/0000000090释放
    当前线程Thread-4等待/locks/0000000088释放
    当前线程Thread-9等待/locks/0000000094释放
    当前线程Thread-6等待/locks/0000000095释放
    当前线程Thread-7等待/locks/0000000087释放
    当前线程Thread-0等待/locks/0000000092释放
    当前线程Thread-3等待/locks/0000000089释放
    当前线程Thread-5等待/locks/0000000093释放
    Thread-1->释放锁/locks/0000000087
    当前线程Thread-7获得锁
    Thread-7->释放锁/locks/0000000088
    当前线程Thread-4获得锁
    Thread-4->释放锁/locks/0000000089
    当前线程Thread-3获得锁
    Thread-3->释放锁/locks/0000000090
    当前线程Thread-8获得锁
    Thread-8->释放锁/locks/0000000091
    当前线程Thread-2获得锁
    Thread-2->释放锁/locks/0000000092
    当前线程Thread-0获得锁
    Thread-0->释放锁/locks/0000000093
    当前线程Thread-5获得锁
    Thread-5->释放锁/locks/0000000094
    当前线程Thread-9获得锁
    Thread-9->释放锁/locks/0000000095
    当前线程Thread-6获得锁
    Thread-6->释放锁/locks/0000000096
    

    相关文章

      网友评论

          本文标题:(10)分布式锁的手动实现

          本文链接:https://www.haomeiwen.com/subject/ethiqqtx.html