美文网首页
Zookeeper 分布式锁

Zookeeper 分布式锁

作者: jianghushao | 来源:发表于2021-11-22 08:32 被阅读0次
    package com.shaolong;
    
    public class Lock {
        private String lockId;
        private boolean isActive;
        private String path;
    
        public Lock() {
        }
    
        public Lock(String lockId, String path) {
            this.lockId = lockId;
            this.path = path;
        }
    
        public String getLockId() {
            return lockId;
        }
    
        public void setLockId(String lockId) {
            this.lockId = lockId;
        }
    
        public boolean isActive() {
            return isActive;
        }
    
        public void setActive(boolean active) {
            isActive = active;
        }
    
        public String getPath() {
            return path;
        }
    
        public void setPath(String path) {
            this.path = path;
        }
    }
    
    
    package com.shaolong;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.util.List;
    import java.util.stream.Collectors;
    
    public class ZookeeperLock {
        private ZkClient zkClient;
    
        private String rootConcatPath = "/wms_lock/";// 从根路径开始 拼接 /wms_lock/itm-0001-0000000026 节点
    
        private String root = "/wms_lock";
    
        public ZookeeperLock(){
            zkClient = new ZkClient("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",2000,5000);
            if(!zkClient.exists(root)){
                zkClient.createPersistent(root);
            }
        }
    
        public Lock lock(String lockId,long timeOut){
            Lock lockNode = createLockNode(lockId);
            lockNode = tryActiveLock(lockNode);
            if(!lockNode.isActive()){
                try {
                    synchronized (lockNode){
                        lockNode.wait(timeOut);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return lockNode;
        }
    
        public Lock tryActiveLock(Lock lock){
            //判断是否获得锁
            List<String> list = zkClient.getChildren(root)
                    .stream().sorted()
                    .map(p -> root+"/" + p)
                    .collect(Collectors.toList());
            String firstPath = list.get(0);
            if(firstPath.equals(lock.getPath())){
                lock.setActive(true);
            }else{
                //添加监听
                String upNodePath = list.get(list.indexOf(lock.getPath()) - 1);
                zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
                    @Override
                    public void handleDataChange(String s, Object o) throws Exception {
    
                    }
    
                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        System.out.println("删除节点 "+dataPath);
                        Lock lockNode = tryActiveLock(lock);
                        synchronized (lock){
                            if(lockNode.isActive()){
                                lockNode.notify();
                            }
                        }
                        zkClient.unsubscribeDataChanges(upNodePath,this);
                    }
                });
            }
    
            //添加上一个节点监听
    
            //再次重试激活锁
    
            return  lock;
        }
    
        public void unLock(Lock lock){
            zkClient.delete(lock.getPath());
        }
    
        private Lock createLockNode(String lockId){
            //创建好目录
            createMakedir(lockId);
    
            //创建临时节点
            String path = zkClient.createEphemeralSequential(rootConcatPath + lockId, "w");
            Lock lock = new Lock();
            lock.setLockId(lockId);
            lock.setPath(path);
            lock.setActive(false);
            return lock;
        }
    
        private void createMakedir(String lockId) {
            if(lockId.contains("/")){
                String temp = lockId;
                if(lockId.startsWith("/")){
                    temp = lockId.substring(1);
                }
                String[] split = temp.split("/");
                for (int i=0;i<split.length-1;i++) {
                    root+="/"+split[i];
                    if (!zkClient.exists(root)) {
                        zkClient.createPersistent(root);
                    }
                }
                if(lockId.startsWith("/")){
                    rootConcatPath = rootConcatPath.substring(0,rootConcatPath.length()-1);
                }
            }
        }
    }
    
    
    
    package com.xinhua.zk;
    
    import com.shaolong.Lock;
    import com.shaolong.ZookeeperLock;
    
    public class RawOut implements Runnable{
    
        ZookeeperLock lock2 = new ZookeeperLock();
    
        String lockId = "rawout/aaa/bbb/mm/itm-0001-";
        @Override
        public void run() {
            Lock lock = null;
            try {
                 lock = lock2.lock(lockId, 3000);
                Thread.sleep(3000);
    
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock2.unLock(lock);
            }
    
        }
    }
    
    
    
    package com.xinhua.zk;
    
    public class test {
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(new RawOut()).start();
            }
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:Zookeeper 分布式锁

          本文链接:https://www.haomeiwen.com/subject/zkivtrtx.html