美文网首页
Zookeeper JavaAPI

Zookeeper JavaAPI

作者: 歌哥居士 | 来源:发表于2019-03-29 16:07 被阅读0次

    依赖

        <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.5-cdh5.7.0</version>
            </dependency>
    

    连接Zookeeper

    连接

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000,
            (watchedEvent) -> System.out.println(String.format("接收到watcher通知:%s", watchedEvent)));
    System.out.println("---------开始连接---------");
    System.out.println(String.format("连接状态:%s",zk.getState()));
    Thread.sleep(2000);
    System.out.println(String.format("连接状态:%s",zk.getState()));
    System.out.println("---------完成连接---------");
    

    重连

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
    Thread.sleep(2000);
    
    ZooKeeper zkReConn = new ZooKeeper("host000:2181", 5000, null, zk.getSessionId(), zk.getSessionPasswd());
    System.out.println("---------开始连接---------");
    System.out.println(String.format("连接状态:%s",zkReConn.getState()));
    Thread.sleep(2000);
    System.out.println(String.format("连接状态:%s",zkReConn.getState()));
    System.out.println("---------完成连接---------");
    

    原子操作

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
    ArrayList<Op> opList = new ArrayList<>();
    opList.add(Op.delete("/baozi",-1));
    opList.add(Op.delete("/baozi2", -1));
    zk.multi(opList);
    

    创建节点

    同步

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
    String path = zk.create("/baozi", "data-baozi".getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT);
    System.out.println(path);
    

    异步

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
    zk.create("/baozi2", "data-baozi2".getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT,
            ((rc, path, ctx, name) -> {
                String res = String.format("返回的状态码:%s, " +
                                "创建路径:%s, " +
                                "办正事的对象:%s, " +
                                "真实路径:%s",
                        rc,path,ctx,name);
                System.out.println(res);
            }),
            "{'create':'success'}"
    );
    // 等创建操作
    Thread.sleep(2000);
    // 返回结果: 返回的状态码:0, 创建路径:/baozi2, 办正事的对象:{'create':'success'}, 真实路径:/baozi2
    

    监听节点

    ZooKeeper zk = null;
    
    @Test
    public void getDataAndWatch() throws IOException, KeeperException, InterruptedException {
        zk = new ZooKeeper("host000:2181", 5000, (watchEvent) -> {
            // 监听到事件之后,重新获取节点
            System.out.println("--------------------------");
            try {
                zk.getChildren("/", true).forEach(System.out::println);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("--------------------------");
        });
        // 获取节点,打印节点
        zk.getChildren("/", true).forEach(System.out::println);
    
        Thread.sleep(Long.MAX_VALUE);
    }
    

    判断节点是否存在

    @Test
    public void exist() throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
        Stat stat = zk.exists("/baozi", false);
        System.out.println(stat == null ? "not exist" : "exist");
    }
    

    查看ACL

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
    // 默认权限
    zk.create("/baozi","data-baozi".getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
    // 查看权限
    ArrayList<ACL> acls = (ArrayList<ACL>) zk.getACL("/baozi", new Stat());
    acls.forEach(System.out::println); // 31,s{'world,'anyone}
    

    自定义ACL

    ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
    // Id(scheme,id)+Perms(permissions)
    // 多个可以用| ---  ZooDefs.Perms.CREATE | ZooDefs.Perms.READ
    String idPwd = DigestAuthenticationProvider.generateDigest("baozi:baozi");
    List<ACL> acls = Arrays.asList(
            // 可以添加多个自定义ACL
            new ACL(ZooDefs.Perms.ALL, new Id("digest", idPwd))
    );
    
    // 创建
    zk.create("/baozi", "data-baozi".getBytes(), acls, CreateMode.EPHEMERAL);
    // 查看权限: 31,s{'digest,'baozi:15AYXiVWL/Rytxd36t6YS6HvuY0=}
    zk.getACL("/baozi", new Stat()).forEach(System.out::println);
    

    案例

    1. 在Zookeeper上创建一个/servers节点
    2. 服务端代码
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    
    public class DistributeServer {
    
        private String connectString = "host000:2181";
        private ZooKeeper zk = null;
    
        private void openConnect() throws IOException {
            zk = new ZooKeeper(connectString, 5000, null);
        }
    
        private void register(String serverName) throws KeeperException, InterruptedException {
            zk.create("/servers/server",
                    serverName.getBytes(), // 注册的数据是serverName,服务器名称
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    // 因为服务下线节点就消失,所以用-e
                    // 因为每注册一个节点就递增一次序列,所以用-s
                    CreateMode.EPHEMERAL_SEQUENTIAL
            );
        }
    
        private void business() throws InterruptedException {
            // 为了保持上线
            Thread.sleep(Long.MAX_VALUE);
        }
    
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            DistributeServer server = new DistributeServer();
            // 建立连接
            server.openConnect();
            // 注册服务(创建节点
            server.register(args[0]);
            // 处理业务
            server.business();
        }
    }
    
    1. 客户端代码
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.List;
    
    public class DistributeClient {
    
        private String connectString = "host000:2181";
        private ZooKeeper zk = null;
    
        private void openConnect() throws IOException {
            zk = new ZooKeeper(connectString, 5000, (event -> {
                try {
                    listen();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
        }
    
        private void listen() throws KeeperException, InterruptedException {
            // 监听servers节点,并获得子节点
            List<String> nodes = zk.getChildren("/servers", true);
            for (String node : nodes) {
                // 获取子节点的数据,也就是服务器名称
                byte[] serverName = zk.getData("/servers/"+node, false, null);
                System.out.println(new String(serverName, 0, serverName.length));
            }
        }
    
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    
    
    
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            DistributeClient client = new DistributeClient();
            // 1.连接Zookeeper
            client.openConnect();
            // 2.注册监听
            client.listen();
            // 3.业务逻辑处理
            client.business();
        }
    }
    

    相关文章

      网友评论

          本文标题:Zookeeper JavaAPI

          本文链接:https://www.haomeiwen.com/subject/hcqcvqtx.html