[ZooKeeper]基于Java API 实践

作者: 瑾兰 | 来源:发表于2018-06-13 16:51 被阅读18次

    前提

    建立maven项目中 要导入zookeeper的依赖

    <dependency>
         <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
         <version>3.4.8</version>
    </dependency>
    

    我们同时可以打开linux中的zookeeper客户端来验证对比。输入 zkCli.sh 便可以进入zookeeper客户端 。

    一、建立连接

    直接建立连接后,不进行等待判断 运行结果为连接中(CONNECTING)。

    清单1 连接中 zookeeper
        // 一、没有连接成功
        public static void main(String[] args) {
    
            // zookeeper集群 ip  :客户端口
            String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
    
            try {
                ZooKeeper zooKeeper=new ZooKeeper(kvm,4000,null);
           // CONNECTING  连接中   ==》根据zookeeper的四个状态,可知、;没有连接成功
                System.out.println(zooKeeper.getState()); 
                zooKeeper.close();                          // 关闭连接
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    添加CountDownLatch同步工具类.

    清单2 连接zookeeper成功
       // 二、连接成功   zookeeper
        /**
         * 同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
         */
        public static void main(String[] args) {
            final CountDownLatch countDownLatch=new CountDownLatch(1);
            String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
    
            try {
                ZooKeeper zooKeeper=new ZooKeeper(kvm, 4000, new Watcher() {
                    // process: 观察者队列
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        //  SyncConnected  :同步连接
                        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
                            // 如果收到服务端的响应时间,连接成功
    
                            countDownLatch.countDown();
                            System.out.println("建立连接成功");
                        }
    
                    }
                });
    
                countDownLatch.await();
                System.out.println(zooKeeper.getState()); // CONNECTED  :连接成功
                zooKeeper.close();
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    二、数据的增删改查操作

    基于代码的增删改查操作,我们可以去到zookeeper中的zkCli客户端窗口中去验证。
    进入zkCli客户端窗口:在bin目录下 直接输入 zkCli.sh 回车,便可进入。

    清单1 事务操作
     public static void main(String[] args) {
    
            try {
                final CountDownLatch countDownLatch=new CountDownLatch(1);
                java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";
    
                java.lang.String kvm2="192.168.0.85:2181";
    
                ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
    
                        if (Event.KeeperState.SyncConnected==watchedEvent.getState()){
                            countDownLatch.countDown();//如果收到了服务端的响应时间,连接成功
                            System.out.println("zk 建立连接");
                        }
                    }
                });
                countDownLatch.await();
    
    
                System.out.println(zooKeeper.getState());// connected  成功
    
                /**
                 * zookeeper数据的增删改查
                 */
                // 1.新增节点
                zooKeeper.create("/zk-wcl","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
                Thread.sleep(1000);
                Stat stat=new Stat();// 状态
                // 2.查看新增后的节点状态
                byte[] bytes=zooKeeper.getData("/zk-wcl",null,stat);
    
                System.out.println(new String(bytes));
                // 3.修改节点
                zooKeeper.setData("/zk-wcl","1".getBytes(),stat.getVersion());
    
                // 4.查看修改后的节点状态
                byte[] byte2=zooKeeper.getData("/zk-wcl",null,stat);
                System.out.println(new String(byte2));
    
    
                // 5.删除节点
                zooKeeper.delete("/zk-wcl",stat.getVersion());
    
    
                zooKeeper.close();
                System.in.read();// 当前进程阻塞
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
    
    
        }
    

    三、事件机制

    介绍

    Watcher监听机制是ZooKeeper中非常重要的特性,我们基于zookeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件。通过这个事件机制,可以基于zookeeper实现分布式锁、集群管理功能。

    Watcher特性:当数据发生变化的时候,zookeeper会产生一个Watcher事件,并且会发送到客户端,但是客户端只会接受一次通知。如果后续这个节点再次发生变化,那么之前的设置watcher的客户端不会再次受到消息。

    watcher是一次性操作。可以通过循环监听去达到永久监听效果。

    1、如何注册事件机制

    分为两步:绑定事件、触发事件。
    第一步:通过getDateexistsgetChildren这三个操作来绑定事件。
    第二步:凡是事务类型的操作,都会触发监听事件。
    事务操作:create 、``delete````、 setData

    清单1:创建删除节点的事件注册监听实例
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            /*------------------基于Java API 建立连接---------------------------*/
            final CountDownLatch countDownLatch=new CountDownLatch(1);
            java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";
    
            java.lang.String kvm2="192.168.0.85:2181";
            // TODO: 这儿是全局的Watcher
            ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    // 默认事件
                    System.out.println("默认事件:"+watchedEvent.getType());
                    if (Event.KeeperState.SyncConnected==watchedEvent.getState()){
                        countDownLatch.countDown();//如果收到了服务端的响应时间,连接成功
                        System.out.println("zk 建立连接");
                    }
                }
            });
            countDownLatch.await();
    
            /*--------------如何注册、监听事件机制-------------------------*/
            // 一、去创建一个临时节点
            zooKeeper.create("/zk-test-wcl","1".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
    
            /**
             * 二、针对这个节点 绑定事件  3 种方式
             * (exists getdata getchildren)
             * 解释:
             * zooKeeper.exists(String path,Watcher watcher)中的参数
             * watcher为true时,代表默认触发全局 zookeeper 中的内部类
             *
             * 演示:
             * 我们以 exists 来 绑定事件
             */
            Stat stat=zooKeeper.exists("/zk-test-wcl", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println(watchedEvent.getType()+"-->"+watchedEvent.getPath());
                    // TODO:NOTE ==> 因为watcher是一次性操作,只能看到setData操作带来的变化,delete操作看不到变化。
                    // TODO:NOTE ==> 所以,要在绑定一次事件,来持续监听
    
                    try {
                        // TODO :注意true 默认用的是全局的watcher
                        zooKeeper.exists(watchedEvent.getPath(),true);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            // 通过修改的事务类型操作来触发监听事件
            // TODO:1、setData
           stat = zooKeeper.setData("/zk-test-wcl","2".getBytes(),
                    stat.getVersion());
            Thread.sleep(1000);
            // TODO: 2、delete
            zooKeeper.delete("/zk-test-wcl",stat.getVersion());
            System.in.read();
        }
    
    
    
    2、watcher事件类型
    事件类型 含义
    None(-1) 客户端连接状态发生改变时,会受到none事件
    NodeCreated(1) 创建节点事件
    NodeDeleted(2) 删除节点事件
    NodeDataChanged(3) 节点数据发生变更
    NodeChildrenChanged(4) 子节点被创建、被删除会发生事件触发
    3、什么样的操作会产生什么类型的事件

    待续。。。。。

    4、事件的实现原理

    以上内容是关于zookeeper 的zkCli客户端java API的实操。其实zookeeper还有一个客户端Curator。Curator是对zookeeper原生Java API的封装。下面来记录下,我贴下练习的代码。

    四、zookeeper客户端:Curator 在java API中的实操

    文件图.png

    前提:

    在pom.xml文件中导入依赖的Curator包

        <!--开源客户端 Curator 依赖包 start-->
          <dependency>
              <groupId>org.apache.curator</groupId>
              <artifactId>curator-framework</artifactId>
              <version>4.0.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.curator</groupId>
              <artifactId>curator-recipes</artifactId>
              <version>4.0.0</version>
          </dependency>
          <!--开源客户端 Curator 依赖包  end -->
    
    1、获取,创建,修改信息

    建立连接并获取curator节点,注意,这个节点必须是已经存在的。

    清单1、客户端的链接及获取命名空间
     String kvm = "192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
                    connectString(kvm).sessionTimeoutMs(4000).
                    retryPolicy(new ExponentialBackoffRetry(100, 3)).
                    namespace("curator").build();
            curatorFramework.start();
    
    清单2、创建一个多级节点

    在原生Java API中创建节点,必须逐层创建,即必须先存在父节点,子节点才能创建。像上面中已经介绍过的zookeeper zkCli客户端。

    现在在进一步封装的Curator客户端,便可以直接创建一个多级节点。

    curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
                    forPath("/wcl/node1","1".getBytes());
    
    清单3、获取信息并修改信息
     Stat stat = new Stat();
            curatorFramework.getData().storingStatIn(stat).forPath("/wcl/node1");
            curatorFramework.setData().withVersion(stat.getVersion()).forPath("/wcl/node1", "xx".getBytes());
    
            
    

    注意:在main()方法的最后 要关闭。添加这句代码:curatorFramework.close();

    相关文章

      网友评论

        本文标题:[ZooKeeper]基于Java API 实践

        本文链接:https://www.haomeiwen.com/subject/vezjeftx.html