美文网首页
Zookeeper简单使用

Zookeeper简单使用

作者: 花丶小伟 | 来源:发表于2017-09-30 17:58 被阅读0次

    zookeeper命令行操作

    • 运行 zkCli.sh –server <ip>进入命令行工具
    • 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容:
      • ls /
    • 创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串:
      • create /zk "myData"
    • 我们运行 get 命令来确认 znode 是否包含我们所创建的字符串:
      • get /zk
    • 监听这个节点的变化,当另外一个客户端改变/zk时,它会打出下面的
      • get /zk watch

    WATCHER::
    WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk

    • 下面我们通过 set 命令来对 zk 所关联的字符串进行设置:
      • set /zk "zsl"
    • 下面我们将刚才创建的 znode 删除:
      • delete /zk
    • 删除节点:
      • rmr /zk

    zookeeper-api应用

    功能 描述
    create 在本地目录树中创建一个节点
    delete 删除一个节点
    exists 测试本地是否存在目标节点
    get/set data 从目标节点上读取 / 写数据
    get/set ACL 获取 / 设置目标节点访问控制列表信息
    get children 检索一个子节点上的列表
    sync 等待要被传送的数据

    demo代码

    import java.io.IOException;
    import java.util.List;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    import org.junit.Before;
    import org.junit.Test;
    
    public class SimpleZkClient {
    
        private static final String connectString = "192.168.127.61:2181,192.168.127.62:2181,192.168.127.63:2181";
        private static final int sessionTimeout = 2000;
    
        ZooKeeper zkClient = null;
    
        @Before
        public void init() throws Exception {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
                    System.out.println(event.getType() + "---" + event.getPath());
                    try {
                        zkClient.getChildren("/", true);
                    } catch (Exception e) {
                    }
                }
            });
    
        }
    
        /**
         * 数据的增删改查
         * 
         * @throws InterruptedException
         * @throws KeeperException
         */
    
        // 创建数据节点到zk中
        public void testCreate() throws KeeperException, InterruptedException {
            // 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型
            String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            //上传的数据可以是任何类型,但都要转成byte[]
        }
    
        //判断znode是否存在
        @Test   
        public void testExist() throws Exception{
            Stat stat = zkClient.exists("/eclipse", false);
            System.out.println(stat==null?"not exist":"exist");
            
            
        }
        
        // 获取子节点
        @Test
        public void getChildren() throws Exception {
            List<String> children = zkClient.getChildren("/", true);
            for (String child : children) {
                System.out.println(child);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    
        //获取znode的数据
        @Test
        public void getData() throws Exception {
            
            byte[] data = zkClient.getData("/eclipse", false, null);
            System.out.println(new String(data));
            
        }
        
        //删除znode
        @Test
        public void deleteZnode() throws Exception {
            
            //参数2:指定要删除的版本,-1表示删除所有版本
            zkClient.delete("/eclipse", -1);
            
            
        }
        //删除znode
        @Test
        public void setData() throws Exception {
            
            zkClient.setData("/app1", "imissyou angelababy".getBytes(), -1);
            
            byte[] data = zkClient.getData("/app1", false, null);
            System.out.println(new String(data));
            
        }
        
        
    }
    

    Zookeeper的监听器工作机制

    • 监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
    • 监听器的注册是在获取数据的操作中实现:
      • getData(path,watch?)监听的事件是:节点数据变化事件
      • getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件

    分布式共享锁的简单实现

    import java.util.Collections;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    public class DistributedClientLock {
        
    
        // 会话超时
        private static final int SESSION_TIMEOUT = 2000;
        // zookeeper集群地址
        private String hosts = "mini1:2181,mini2:2181,mini3:2181";
        private String groupNode = "locks";
        private String subNode = "sub";
        private boolean haveLock = false;
    
        private ZooKeeper zk;
        // 记录自己创建的子节点路径
        private volatile String thisPath;
    
        /**
         * 连接zookeeper
         */
        public void connectZookeeper() throws Exception {
            zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
                public void process(WatchedEvent event) {
                    try {
    
                        // 判断事件类型,此处只处理子节点变化事件
                        if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
                            //获取子节点,并对父节点进行监听
                            List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
                            String thisNode = thisPath.substring(("/" + groupNode + "/").length());
                            // 去比较是否自己是最小id
                            Collections.sort(childrenNodes);
                            if (childrenNodes.indexOf(thisNode) == 0) {
                                //访问共享资源处理业务,并且在处理完成之后删除锁
                                doSomething();
                                
                                //重新注册一把新的锁
                                thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                                        CreateMode.EPHEMERAL_SEQUENTIAL);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            // 1、程序一进来就先注册一把锁到zk上
            thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
    
            // wait一小会,便于观察
            Thread.sleep(new Random().nextInt(1000));
    
            // 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
            List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
    
            //如果争抢资源的程序就只有自己,则可以直接去访问共享资源 
            if (childrenNodes.size() == 1) {
                doSomething();
                thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
            }
        }
    
        /**
         * 处理业务逻辑,并且在最后释放锁
         */
        private void doSomething() throws Exception {
            try {
                System.out.println("gain lock: " + thisPath);
                Thread.sleep(2000);
                // do something
            } finally {
                System.out.println("finished: " + thisPath);
                //释放锁
                zk.delete(this.thisPath, -1);
            }
        }
    
        public static void main(String[] args) throws Exception {
            DistributedClientLock dl = new DistributedClientLock();
            dl.connectZookeeper();
            Thread.sleep(Long.MAX_VALUE);
        }
    }
    

    相关文章

      网友评论

          本文标题:Zookeeper简单使用

          本文链接:https://www.haomeiwen.com/subject/jrlnextx.html