美文网首页
(10)分布式锁的手动实现

(10)分布式锁的手动实现

作者: Mrsunup | 来源:发表于2018-11-26 22:04 被阅读0次

基本实现思路为基于zookeeper的本身的临时有序节点特性来实现分布式锁,如下图所示

每个客户端都往zookeeper上创建一个临时有序节点,注册完后,取/locks下所有节点中最小的节点,则创建该最小节点的客户端拿到锁。如果当前最小节点是/lock_seq1,那么/lock_seq2边监听/lock_seq1边阻塞等待,当/lock_seq1被删除后,/lock_seq2节点的客户端获得锁,以此类推。
每个节点监听比自己小一个数的临时节点,当监听的节点被删除的(此时认为是释放锁),监听事件被触发

1.代码演示

/**
 * @Project: 3.DistributedProject
 * @description:  手动实现分布式锁
 * @author: sunkang
 * @create: 2018-06-24 08:19
 * @ModificationHistory who      when       What
 **/
public class DistributedLocks implements Lock,Watcher {
    private ZooKeeper zooKeeper = null;
    private String ROOT_LOCK ="/locks";
    private String CURRENT_LOCK ;
    private String WAIT_LOCK;
    private CountDownLatch  countDownLatch;

    //连接才开始操作的
    private CountDownLatch sysConectDownLatch;

    public DistributedLocks() {
        try {
            //建立zookeeper的连接
            sysConectDownLatch = new CountDownLatch(1);
            zooKeeper = new ZooKeeper("192.168.44.129:2181",40000,this);
            sysConectDownLatch.await();
            //是否存在根节点,如果不存在,则去创建持久化节点
          Stat stat = zooKeeper.exists(ROOT_LOCK,false);
          if(stat == null){
              zooKeeper.create(ROOT_LOCK,"0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
          }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lock() {
        if(tryLock()){
            System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->获得锁成功");
            return ;
        }
        //添加监控
        waitForLocks(WAIT_LOCK);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    //尝试获取锁
    @Override
    public boolean tryLock() {
        try {
            //创建当前的节点
            CURRENT_LOCK = zooKeeper.create(ROOT_LOCK+"/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            //获取当前的所有的子节点
            System.out.println(Thread.currentThread().getName()+"->"+ CURRENT_LOCK+",尝试竞争锁");
            List<String> childrens =  zooKeeper.getChildren(ROOT_LOCK,false);
            SortedSet<String> treeSet =new  TreeSet<String>();
            for(String children : childrens){
                treeSet.add(ROOT_LOCK+"/"+children);
            }
            //获取最小的节点
            String minNode = treeSet.first();
            if(CURRENT_LOCK.equals(minNode)){
                return true;
            }
            //获取当前节点的上一个节点
           SortedSet<String> lessCurrentSets=  treeSet.headSet(CURRENT_LOCK);
            if(lessCurrentSets != null){
                WAIT_LOCK = lessCurrentSets.last();
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    //对上一节点记性监控
    private boolean waitForLocks(String pre){
        try {
            //进行监控
            Stat stat=  zooKeeper.exists(pre,true);
            if(stat!=null){
                System.out.println("当前线程"+Thread.currentThread().getName()+"等待"+WAIT_LOCK+"释放");
                 countDownLatch = new CountDownLatch(1);
                countDownLatch.await();
                System.out.println("当前线程"+Thread.currentThread().getName()+"获得锁");
                return true;
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+ "->释放锁"+ CURRENT_LOCK);
        try {
            zooKeeper.delete(CURRENT_LOCK,-1);
            CURRENT_LOCK=null;
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if( watchedEvent.getState().equals(Event.KeeperState.SyncConnected)){
            sysConectDownLatch.countDown();//说明已经建立了连接
        }
        //当节点的信息发送删除的时候就会触发该监控
        if(this.countDownLatch != null){
            countDownLatch.countDown();
        }
    }
}

2.测试演示

  • 测试代码,启动是个线程,模仿是个客户端同时访问zookeeper
/**
 * @Project: 3.DistributedProject
 * @description: 测试
 * @author: sunkang
 * @create: 2018-06-24 08:44
 * @ModificationHistory who      when       What
 **/
public class LocksDemo {
    public static void main(String[] args) throws IOException {
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i=0;i<10;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedLocks lock = null;
                    try {
                        countDownLatch.await();
                        lock = new DistributedLocks();
                        lock.lock(); //获得锁
                        //todo  做一些事情
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        lock.unlock();
                    }
                }
            },"Thread-"+i).start();
            countDownLatch.countDown();
        }
    }
}
  • 观察结果 :可以看到每2秒之后,或有线程获取锁
Thread-1->/locks/0000000087,尝试竞争锁
Thread-7->/locks/0000000088,尝试竞争锁
Thread-4->/locks/0000000089,尝试竞争锁
Thread-8->/locks/0000000091,尝试竞争锁
Thread-3->/locks/0000000090,尝试竞争锁
Thread-2->/locks/0000000092,尝试竞争锁
Thread-9->/locks/0000000095,尝试竞争锁
Thread-5->/locks/0000000094,尝试竞争锁
Thread-6->/locks/0000000096,尝试竞争锁
Thread-0->/locks/0000000093,尝试竞争锁
Thread-1->/locks/0000000087->获得锁成功
当前线程Thread-2等待/locks/0000000091释放
当前线程Thread-8等待/locks/0000000090释放
当前线程Thread-4等待/locks/0000000088释放
当前线程Thread-9等待/locks/0000000094释放
当前线程Thread-6等待/locks/0000000095释放
当前线程Thread-7等待/locks/0000000087释放
当前线程Thread-0等待/locks/0000000092释放
当前线程Thread-3等待/locks/0000000089释放
当前线程Thread-5等待/locks/0000000093释放
Thread-1->释放锁/locks/0000000087
当前线程Thread-7获得锁
Thread-7->释放锁/locks/0000000088
当前线程Thread-4获得锁
Thread-4->释放锁/locks/0000000089
当前线程Thread-3获得锁
Thread-3->释放锁/locks/0000000090
当前线程Thread-8获得锁
Thread-8->释放锁/locks/0000000091
当前线程Thread-2获得锁
Thread-2->释放锁/locks/0000000092
当前线程Thread-0获得锁
Thread-0->释放锁/locks/0000000093
当前线程Thread-5获得锁
Thread-5->释放锁/locks/0000000094
当前线程Thread-9获得锁
Thread-9->释放锁/locks/0000000095
当前线程Thread-6获得锁
Thread-6->释放锁/locks/0000000096

相关文章

网友评论

      本文标题:(10)分布式锁的手动实现

      本文链接:https://www.haomeiwen.com/subject/ethiqqtx.html