美文网首页zookeeper专题
zookeeper原生java手写实现分布式锁

zookeeper原生java手写实现分布式锁

作者: 先生zeng | 来源:发表于2019-07-12 19:56 被阅读0次
/**
 * @version 2019/7/12
 * @description:
 * @since 2019/7/12
 */
public class DistributedLock implements Lock,Watcher {

    private ZooKeeper zk=null;

    //定义根节点
    private String ROOT_LOCK="/locks";

    //等待前一个锁
    private String WAIT_LOCK;

    //当前锁
    private String CURRENT_LOCK;

    private CountDownLatch countDownLatch;

    public DistributedLock() {

        try {
//这里地址要填写zookeeper上面的地址
            zk=new ZooKeeper("localhost:2181",4000,this);

            //判断当前根节点是否存在
            Stat exists = zk.exists(ROOT_LOCK, false);
            if (exists == null) {
                zk.create(ROOT_LOCK,"0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void lock() {
        //如果获得锁成功
        if(tryLock()){
            System.out.println(Thread.currentThread().getName()+"==="+CURRENT_LOCK+"->获得锁成功");
            return;
        }
        try {
            waitForLock(WAIT_LOCK);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private boolean waitForLock(String prev) throws KeeperException, InterruptedException {

            //监听上一个锁是否存在
            Stat stat = zk.exists(prev,true);
            if(stat!=null){
                System.out.println(Thread.currentThread().getName()+"->等待锁"+ROOT_LOCK+"/"+prev+"释放");
                countDownLatch=new CountDownLatch(1);
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName()+"->获得锁成功");
            }
        return true;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    @Override
    public boolean tryLock() {
        try {
            //创建临时有序节点
            zk.create(ROOT_LOCK+"/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",尝试竞争锁");
            //尝试获取根节点下面的所有子节点
            List<String> childrens = zk.getChildren(ROOT_LOCK, false);
            SortedSet<String> sortedSet=new TreeSet<>();

            for(String children:childrens){
                sortedSet.add(ROOT_LOCK+"/"+children);
            }
            //获得当前所有子节点的最小节点
            String first = sortedSet.first();
            SortedSet<String> lessThenMe=((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK);
            //如果当前的锁的节点和字节点中最小的节点相同,表示获得锁成功。
            if(CURRENT_LOCK.equals(first)){
                return true;
            }
            //如果没有获得比当前的节点更小的最后一个节点,设置成WAIT_LOCK
            if(!lessThenMe.isEmpty()){
                WAIT_LOCK=lessThenMe.last();
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+"->释放锁"+CURRENT_LOCK);

        try {
            zk.delete(CURRENT_LOCK,-1);
            CURRENT_LOCK=null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(this.countDownLatch!=null){
            this.countDownLatch.countDown();
        }
    }
}

测试类:

public class App {

    public static void main(String[] args) throws IOException {

         CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i=0;i<10;i++){
            new Thread(()->{

                try {
                    countDownLatch.await();
                    DistributedLock distributedLock = new DistributedLock();
                    distributedLock.lock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            countDownLatch.countDown();
        }
        System.in.read();
    }
}

结果如下:

相关文章

网友评论

    本文标题:zookeeper原生java手写实现分布式锁

    本文链接:https://www.haomeiwen.com/subject/cmdykctx.html