美文网首页
2018-07-18:03-java操作zookeeper

2018-07-18:03-java操作zookeeper

作者: cjxz | 来源:发表于2018-07-25 15:02 被阅读0次
    • 客户端操作zookeeper学习完后,我们要使用java控制zookeeper。我们先用apache提供的zookeeper功能来操作zookeeper服务器。
    • 我们创建的每个zookeeper都是一个客户端链接

    先看代码,然后解释

    1.创建一个maven项目,导入Apache的zookeeper依赖

    <dependencies>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.6</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.7</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.37</version>
            </dependency>
        </dependencies>
    

    2.编写客户端连接

    package com.zookeeper.test;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    public class ZookeeperConnection {
        private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        private static CountDownLatch countDownLatch = new CountDownLatch(1);
        //连接过程:not connection connecting connected close
        public static void main(String[] args) throws Exception {
    //        testConnecting();
            testConnected();
        }
        public static void testConnecting() throws Exception{
            ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                }
            });
            System.out.println(zk.getState());
        }
    
        public static void testConnected() throws Exception{
            ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        countDownLatch.countDown();
                        System.out.println(watchedEvent.getState());
                    }
    
                }
            });
            countDownLatch.await();
            System.out.println(zk.getState());
        }
    }
    
    zookeeper连接过程
    连接过程.png
    • testConnected方法输出的是connected
      在使用countDounLatch的时候我们等待连接完成才输出
    • testConnecting方法输出的是connecting
      没有使用线程等待,直接输出。出现这种情况是因为连接是异步的,主线程执行完毕,连接还在进行中。

    3.节点的增删改查

    public static void main(String[] args) throws Exception {
    //        testConnecting();
            ZooKeeper zk = testConnected();
            //创建节点:节点路径,节点携带的值,节点的权限,创建持久化节点
            zk.create("/node","abc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            //获得节点内容:
            Stat stat = new Stat();
            byte[] data = zk.getData("/node",false,stat);
            System.out.println(new String(data));
            //修改节点
            zk.setData("/node","cba".getBytes(),-1);
            Stat stat1 = new Stat();
            byte[] data1 = zk.getData("/node",false,stat1);
            System.out.println(new String(data1));
            //删除节点
            zk.delete("/node",-1);
        }
    

    4.zookeeper的watch功能(重点)

    watch类似订阅发布模式。监听某个节点,当节点发生变化监听节点就会收到通知。直接上代码

    • watch监听类
    public class MyWatch implements Watcher {
        private static CountDownLatch countDownLatch = new CountDownLatch(1);
        @Override
        public void process(WatchedEvent watchedEvent) {
            try {
                if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    if(Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
                        countDownLatch.countDown();
                        System.out.println("连接中..."+watchedEvent.getState());
                    }
                    //节点数据发生变化
                    else if(watchedEvent.getType() == Event.EventType.NodeDataChanged){
                        String path = watchedEvent.getPath();
                        //能够注册事件的方法getData
                        System.out.println("节点数据变化事件--路径:"+watchedEvent.getPath());
                    }
                    //创建节点
                    else if(watchedEvent.getType() == Event.EventType.NodeCreated){
                        System.out.println("创建节点:路径:"+watchedEvent.getPath());
                    }
                    //创建子节点
                    else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
                        String path = watchedEvent.getPath();
                        //能够注册事件的方法getData
                        System.out.println("子节点数据变化事件--路径:"+watchedEvent.getPath());
                    }
                    //节点删除
                    else if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                        System.out.println("删除节点");
                    }
                }
            }catch (Exception e){
                System.out.println(e.getMessage());
            }
        }
    }
    
    • 实现一个watch只需要实现Watcher类就可以了,然后实现里面的process方法
    • WatchedEvent是事件对象,可以根据不同事件类型进行处理
    • 测试watch类
    public class ZkClient {
        private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        public static void main(String[] args) throws Exception {
            MyWatch myWatch = new MyWatch();
            ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
            Stat stat = zk1.exists("/watchNode",true);
            if(stat == null){
                zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }
    
            stat = zk1.exists("/watchNode",true);
            if(stat == null){
                zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }else{
                System.out.println("客户端2注册了watchNode节点的getDate watch");
                zk1.getData("/watchNode",true,stat);
                System.out.println("客户端1修改watchNode节点数据");
                zk1.setData("/watchNode","1234".getBytes(),-1);
            }
        }
    }
    
    • 运行结果:


      image.png

    上面我们写了最简单的事件监听。单个客户端监听/watchNode这个节点。zookeeper的监听是单次通知,第二次操作/watchNode时需要二次监听。zookeeper提供监听的方法有三个:exists;getData;getChildren。

    • 多客户端监听同一节点
    public class ZkClient2 {
        private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        public static void main(String[] args) throws Exception {
            MyWatch myWatch = new MyWatch();
            ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
            ZooKeeper zk2 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
            //zk1监听了/watchNode节点事件
            Stat stat1 = zk1.exists("/watchNode",true);
            //zk2监听了/watchNode节点事件
            Stat stat2 = zk2.exists("/watchNode",true);
            if(stat1 == null){
                zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }
            //当zk1创建节点后,zk2也收到了通知
        }
    }
    
    • 执行结果:


      image.png

    相关文章

      网友评论

          本文标题:2018-07-18:03-java操作zookeeper

          本文链接:https://www.haomeiwen.com/subject/rhnnpftx.html