zookeeper

作者: 快点给我想个名 | 来源:发表于2019-01-22 10:01 被阅读0次
    zookeeper
    • 节点特性
      1.同级节点的唯一性
      2.临时节点和持久节点
      3.有序节点和无序节点
      4.临时节点下不能存在子节点
    • 节点详情
      cZxid 创建节点时的事务ID
      ctime 创建节点时的时间
      mZxid 最后修改节点时的事务ID
      mtime 最后修改节点时的时间
      pZxid 表示该节点的子节点列表最后一次修改的事务ID,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该ID
      cversion 子节点版本号,子节点每次修改版本号加1
      dataversion 数据版本号,数据每次修改该版本号加1
      aclversion 权限版本号,权限每次修改该版本号加1
      ephemeralOwner 如果是临时节点则内容为绑定的当前sessionID,否则为0
      dataLength 该节点的数据长度
      numChildren 该节点拥有子节点的数量
    • 读写操作
      读:可以在任意节点去读取数据
      写:将请求转发到leader上处理
    • 分布式锁
    package com.simon.lock.zookeeper;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author: lim
     * @Description:
     * @Date: 2018/11/23.
     */
    public class ZookeeperLockUtil implements Watcher{
    
        /**
         * zookeeper集群地址
         */
        private static final String zookeeperConfig = "192.168.33.200:2181,192.168.33.201:2181,192.168.33.203:2181";
    
        /**
         * 持久根节点
         */
        private String LOCK_ROOT= "/LockRoot";
    
        /**
         * 竞争的资源
         */
        private String lockName;
    
        /**
         * 当前节点
         */
        private String CURRENT_LOCK;
    
        /**
         * 等待的节点
         */
        private String WAIT_LOCK;
    
        /**
         * 等待超时时间
         */
        private Long waitTimeout = 5000L;
    
        private CountDownLatch countDownLatch;
    
        /**
         * 保证连接后,再去尝试创建节点
         */
        private CountDownLatch createCountDownLatch = new CountDownLatch(1);
    
        private ZooKeeper zooKeeper;
    
    
        /**
         * 初始化根节点
         * @param lockName 争夺的锁资源
         */
        public ZookeeperLockUtil(String lockName) {
    
            this.lockName = lockName;
    
            try {
                zooKeeper = new ZooKeeper(zookeeperConfig,50000,this);
                createCountDownLatch.await();
                Stat stat = zooKeeper.exists(LOCK_ROOT, this);
                if(stat == null){
                    zooKeeper.create(LOCK_ROOT,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
    
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    
        /**
         * 锁
         * @return
         */
        public boolean lock(){
    
            if(tryLock()){
                return true;
            }
            return waitForLock(WAIT_LOCK,waitTimeout);
        }
    
        /**
         * 等待锁
         * @param wait_lock
         * @param waitTimeout
         * @return
         */
        private boolean waitForLock(String wait_lock, Long waitTimeout) {
    
            try {
                Stat stat = zooKeeper.exists(LOCK_ROOT + "/" + wait_lock, true);
    
                if(stat != null){
                    countDownLatch = new CountDownLatch(1);
                    countDownLatch.await(waitTimeout, TimeUnit.MILLISECONDS);
                    countDownLatch = null;
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
    
        /**
         * 尝试获取锁
         * @return
         */
        public boolean tryLock(){
    
            try {
                //创建临时有序节点
                CURRENT_LOCK = zooKeeper.create(LOCK_ROOT + "/" + lockName,new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
    
                //System.out.println(Thread.currentThread().getName()+"创建节点"+CURRENT_LOCK);
    
                //判断当前节点是否为第一个节点
                List<String> childs = zooKeeper.getChildren(LOCK_ROOT, false);
    
                //System.out.println(Thread.currentThread().getName()+"获取子节点个数"+childs.size());
    
                List<String> sortLocks = new ArrayList<>();
    
                for(String child : childs){
                    String nodePrefix = child.substring(0,lockName.length());
                    if(lockName.equals(nodePrefix)){
                        sortLocks.add(child);
                    }
                }
                Collections.sort(sortLocks);
    
                //System.out.println(Thread.currentThread().getName()+"获取锁"+CURRENT_LOCK);
    
                //如果是则获取锁成功
                if(CURRENT_LOCK.equals(LOCK_ROOT + "/" + sortLocks.get(0))){
                    return true;
                }
    
                //如果不是,则获取当前节点的前一个
                String preNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
    
                WAIT_LOCK = sortLocks.get(Collections.binarySearch(sortLocks,preNode) - 1);
    
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
    
    
        /**
         * 释放锁
         * @return
         */
        public boolean releaseLock(){
    
            try {
                zooKeeper.delete(CURRENT_LOCK,-1);
                CURRENT_LOCK = null;
                zooKeeper.close();
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            return false;
        }
    
    
        /**
         * 监听事件
         * @param watchedEvent
         */
        @Override
        public void process(WatchedEvent watchedEvent) {
    
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                createCountDownLatch.countDown();
            }
    
            if(countDownLatch != null){
                countDownLatch.countDown();
            }
    
        }
    }
    

    测试

    package com.simon;
    
    import com.simon.lock.zookeeper.ZookeeperLockUtil;
    
    /**
     * @Author: lim
     * @Description:
     * @Date: 2018/11/23.
     */
    public class ZookeeperLockTest {
    
        static int counter = 10;
    
    
        public static void main(String[] args) {
    
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    ZookeeperLockUtil zookeeperLockUtil = null;
                    try {
                        zookeeperLockUtil = new ZookeeperLockUtil("demo");
                        if(zookeeperLockUtil.lock()){
                            counter--;
                            System.out.println(Thread.currentThread().getName()+"=="+counter);
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        if(zookeeperLockUtil != null){
                            zookeeperLockUtil.releaseLock();
                        }
                    }
    
                }
            };
    
            for(int i = 0;i < 10;i++){
                Thread t = new Thread(runnable);
                t.start();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:zookeeper

          本文链接:https://www.haomeiwen.com/subject/ndtljqtx.html