美文网首页
zookeeper day1

zookeeper day1

作者: 巴巴11 | 来源:发表于2020-05-16 16:43 被阅读0次

    1 zk 简介

    zk 管理大数据生态系统中各个组件。(Hadoop、Hive、Spark)


    image.png

    zk应用场景:

    zk是一个经典的分布式数据一致性解决方案。致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

    • 维护配置信息
    • 分布式锁服务
    • 集群管理
    • 生成分布式唯一ID

    维护配置信息:

    在分布式集群上,需要保证每台机器的配置信息一致。例如数据库的url,用户名密码等。zk提供高可用、高性能的配置服务,能够快速高效的完成集群上配置项的更改,并且能够保证集群上机器配置数据的一致性。

    zk 提供配置服务,使用Zab这种一致性协议来保证一致性。

    例如在hbase中,客户端先连接zk,获取hbase集群的配置信息,然后才能操作。开源的消息队列kafka中,也是使用zk来维护broker信息。在dubbo中也广泛使用了zk来管理一些配置。

    image.png

    分布式锁服务:

    在分布式系统中,多台服务器运行着相同的服务。当多个服务器在运行时就需要协调各个服务的进度。zk提出了临时有序节点的概念,通过加锁,保证当某个服务在调用时,其他服务不能进行该操作。如果机器挂掉,释放锁并fail over到其他机器继续执行该服务。

    image.png

    集群管理:

    集群中有时因为各种软硬件故障或者网络故障,出现服务器挂掉而被移除出集群或者服务加入集群。zk提供了watch机制,能够将服务的移除/加入的情况通知给集群中其他正常工作的机器,以及时调整存储和计算等任务的分配和执行。zk还会对故障的机器做出诊断并尝试修复。

    image.png

    生成分布式唯一ID:

    每次生成一个新ID时,zk会创建一个持久顺序节点,创建操作返回的节点序号,即为新ID,然后把比自己小的ID删掉。

    zk的设计目标:

    致力于为分布式应用提供一个高性能、高可用、具有严格顺序访问控制能力的分布式协调服务。

    • 高性能

    zk将全量数据存储在内存中,并直接服务于客户端的所有非事务请求。尤其适用于以读为主的场景。

    • 高可用

    zk以集群的方式对外提供服务。每台机器都会在内存中维护当前的服务器状态,并且各个机器之间互相保持通信。只要集群中超过一般的机器都能正常工作,那么集群就能正常对外提供服务。

    • 严格顺序访问

    对于来自客户端的每个请求,zk都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。

    2 zk数据模型

    zk进行数据存储时的数据模型

    zk的数据节点是树状结构(类似linux文件目录结构)。

    树中的每个节点称为znode(zookeeper node),一个znode可用由多个子节点。zk的数据节点在结构上表现为树状结构。使用path来定位到某个znode。比如/wh/node1/node11/node112,wh是根节点。

    znode兼具文件和目录两种特点。
    节点既像文件一样维护了数据、元信息、ACL、时间戳等数据结构,又像目录一样构成了树结构。作为path的一部分。

    image.png

    znode大体上分为三部分:

    • 节点的数据:即znode data(节点path,节点data的关系就像map的key-value一样)。
    • 节点的子节点children
    • 节点的状态stat:用来描述当前节点的创建、修改记录,包括cZxid、ctime等。

    节点状态stat的属性:
    在zk shell中,通过get命令查看指定路径节点的data、stat信息:


    image.png

    属性说明:

    cZxid : 数据节点创建时的事务ID
    
    ctime :数据节点创建时的时间
    
    mZxid : 数据节点最后一次更新时的事务ID
    
    mtime : 数据节点最后一次更新时的时间
    
    pZxid :  数据节点的子节点最后一次被修改时的事务ID
    
    cversion : 子节点的更改次数
    
    dataVersion : 节点数据的更改次数
    
    aclVersion : 节点的ACL的更改次数
    
    ephemeralOwner : 如果节点是临时节点,则表示创建该节点的会话的sessionID;如果节点是持久节点,则该属性值为0
    
    dataLength : 数据内存长度
    numChildren : 数据节点当前子节点的数量
    

    节点类型:

    分为两种:临时节点和持久化节点。节点的类型在创建时即被确定,并且不能改变。

    • 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话(session)结束,临时节点就会被自动删除。当然也可用手动删除。虽然每个临时节点都会被绑定到一个客户端会话,但他们对所有的客户端都是可见的。临时节点不允许拥有子节点。

    • 持久化节点:生命周期不依赖会话。并且只有在客户端执行删除操作的时候,才能被删除。

    一般使用持久化有序节点来创建分布式唯一ID。
    一般使用临时有序节点来创建分布式锁。

    3 zk linux单机安装

    zk依赖jdk


    image.png
    image.png
    image.png
    image.png

    客户端登陆工具 ./zkCli.sh
    ./zkCli.sh -server ip

    4 zk常用shell命令

    image.png
    image.png image.png image.png

    无法直接delete含有子节点的节点,要有rmr命令。

    image.png
    image.png image.png image.png

    ls2 = ls + stat

    image.png

    一个监听器的注册只能捕获一次事件

    image.png

    5 针对zk 数据节点的权限控制

    Acl权限控制, Access control list

    image.png image.png image.png image.png image.png image.png

    多个ip授权
    setAcl /node2 ip:192.168.1.1:cdrwa,ip:192.168.1.2:cdr

    image.png image.png image.png image.png
    image.png

    6 zk JavaAPI操作zk数据

    通过Java API去操作zk中存储的一系列数据。

    image.png
    package connection;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import javax.swing.plaf.IconUIResource;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkConnection {
        public static void main(String[] args) {
            try {
                // 计数器
                // zk创建是异步的
                final CountDownLatch latch = new CountDownLatch(1);
                // arg1 zk server ip port
                // arg2 client timeout mills
                // arg3 watcher
                ZooKeeper zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                            System.out.println("connectioned ...");
                            latch.countDown();
                        }
                    }
                });
                // 阻塞直到连接创建成功
                latch.await();
                // 打印会话编号
                System.out.println(zk.getSessionId());
                System.out.println("all done...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    
    image.png
    package create;
    
    import org.apache.zookeeper.*;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkCreate1 {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void testCreate1() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg2 : node data
            // arg3 : 权限列表 world:anyone:cdrwa
            // arg4 : 节点类型 持久化节点
            zk.create("/wh/node2", "node2".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    
    
    package create;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkCreate1 {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void testCreate1() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg2 : node data
            // arg3 : 权限列表 world:anyone:cdrwa
            // arg4 : 节点类型 持久化节点
            zk.create("/wh/node2", "node2".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    
        @Test
        public void create2() throws Exception {
            // arg3 : world:anyone:r
            zk.create("/wh/node3", "node3".getBytes(),
                    ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    
        @Test
        public void create3() throws Exception {
            // world授权模式
            // 权限列表
            List<ACL> acls = new ArrayList<ACL>();
            // 授权模式和授权对象
            Id id = new Id("world", "anyone");
            // 权限设置
            acls.add(new ACL(ZooDefs.Perms.READ, id));
            acls.add(new ACL(ZooDefs.Perms.WRITE, id));
            zk.create("/wh/node4", "node4".getBytes(),
                    acls, CreateMode.PERSISTENT);
        }
    
        @Test
        public void create4() throws Exception {
            // ip授权模式
            // 权限列表
            List<ACL> acls = new ArrayList<ACL>();
            // 授权模式和授权对象
            Id id = new Id("ip", "localhost");
            // 权限设置
            acls.add(new ACL(ZooDefs.Perms.ALL, id));
            zk.create("/wh/node5", "node5".getBytes(),
                    acls, CreateMode.PERSISTENT);
        }
    
        @Test
        public void create5() throws Exception {
            // auth授权模式
            // 添加授权用户
            zk.addAuthInfo("digest", "wh:1234".getBytes());
            zk.create("/wh/node5", "node5".getBytes(),
                    ZooDefs.Ids.CREATOR_ALL_ACL , CreateMode.PERSISTENT);
        }
    
        @Test
        public void create6() throws Exception {
            // auth授权模式
            // 添加授权用户
            zk.addAuthInfo("digest", "wh:1234".getBytes());
            // 权限列表
            List<ACL> acls = new ArrayList<ACL>();
            // 授权模式和授权对象
            Id id = new Id("auth", "wh");
            // 权限设置
            acls.add(new ACL(ZooDefs.Perms.READ, id));
            zk.create("/wh/node5", "node5".getBytes(),
                    acls , CreateMode.PERSISTENT);
        }
    
        @Test
        public void create7() throws Exception {
            // digest授权模式
            // 权限列表
            List<ACL> acls = new ArrayList<ACL>();
            // 授权模式和授权对象
            Id id = new Id("digest", "passwdMd5");
            // 权限设置
            acls.add(new ACL(ZooDefs.Perms.ALL, id));
            zk.create("/wh/node5", "node5".getBytes(),
                    acls , CreateMode.PERSISTENT);
        }
    
        @Test
        public void create8() throws Exception {
            // 持久化顺序节点
            // Ids.OPEN_ACL_UNSAFE  world:anyone:cdrwa
            String result = zk.create("/wh/node5", "node5".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT_SEQUENTIAL);
            System.out.println(result); // 节点路径
        }
    
        @Test
        public void create9() throws Exception {
            // 临时节点
            // Ids.OPEN_ACL_UNSAFE  world:anyone:cdrwa
            String result = zk.create("/wh/node5", "node5".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL);
            System.out.println(result); // 节点路径
        }
    
        @Test
        public void create10() throws Exception {
            // 临时顺序节点
            // Ids.OPEN_ACL_UNSAFE  world:anyone:cdrwa
            String result = zk.create("/wh/node5", "node5".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(result); // 节点路径
        }
    
        @Test
        public void create11() throws Exception {
            // 异步方式创建节点
            zk.create("/wh/node5", "node5".getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                            new AsyncCallback.StringCallback() {
                                public void processResult(int rc, String path, Object ctx, String name) {
                                    // 0 代表创建成功
                                    System.out.println(rc);
                                    // node path
                                    System.out.println(path);
                                    // node name
                                    System.out.println(name);
                                    // 上下文参数
                                    System.out.println(ctx);
                                }
                            }, "I am context");
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    }
    
    
    image.png
    package set;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkSet1 {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void set1() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg2 : new node data
            // arg3 : 版本号,-1代表版本号不参与更新
            Stat stat = zk.setData("/wh/node2", "node2".getBytes(), -1);
            System.out.println(stat.getCzxid());
        }
    
        @Test
        public void set2() throws KeeperException, InterruptedException {
            // 异步方式修改节点
            // arg1 : node path
            // arg2 : new node data
            // arg3 : 版本号,-1代表版本号不参与更新
            zk.setData("/wh/node2", "node2".getBytes(), -1
                    , new AsyncCallback.StatCallback() {
                        public void processResult(int rc, String path, Object ctx, Stat stat) {
                            // 0 代表创建成功
                            System.out.println(rc);
                            // node path
                            System.out.println(path);
                            // 上下文参数
                            System.out.println(ctx);
                            // 属性描述对象
                            System.out.println(stat.getVersion());
                        }
                    }, "I am context");
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    
    }
    
    
    image.png
    package delete;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkDel1 {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void del1() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg2 : 版本号,-1代表版本号不参与更新
            zk.delete("/wh/node2", -1);
        }
    
        @Test
        public void del2() throws KeeperException, InterruptedException {
            // 异步方式删除节点
            // arg1 : node path
            // arg2 : new node data
            // arg3 : 版本号,-1代表版本号不参与更新
            zk.delete("/wh/node2", -1
                    , new AsyncCallback.VoidCallback() {
                        public void processResult(int rc, String path, Object ctx) {
                            // 0 代表创建成功
                            System.out.println(rc);
                            // node path
                            System.out.println(path);
                            // 上下文参数
                            System.out.println(ctx);
                        }
                    }, "I am context");
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    
    }
    
    
    image.png
    package get;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkGet1 {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void get1() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg3 : 读取节点属性的对象
            Stat stat = new Stat();
            byte[] res = zk.getData("/wh/node2", false, stat);
            // 打印节点数据
            System.out.println(new String(res));
            // 版本信息
            System.out.println(stat.getVersion());
        }
    
        @Test
        public void get2() throws KeeperException, InterruptedException {
            // 异步方式获取节点
            // arg1 : node path
            // arg3 : 版本号,-1代表版本号不参与更新
            zk.getData("/wh/node2", false, new AsyncCallback.DataCallback() {
                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                    // 0 代表创建成功
                    System.out.println(rc);
                    // node path
                    System.out.println(path);
                    // 上下文参数
                    System.out.println(ctx);
                    // data
                    System.out.println(new String(data));
                    // 属性描述对象
                    System.out.println(stat.getVersion());
                }
            }, "I am context");
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    
    }
    
    
    image.png
    package getChildren;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkGetChild {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void getChild1() throws KeeperException, InterruptedException {
            // arg1 : node path
            List<String> res = zk.getChildren("/wh/node2", false);
            // 打印节点数据
            for (String child : res)
            System.out.println(child);
        }
    
        @Test
        public void get2() throws KeeperException, InterruptedException {
            // 异步方式获取子节点
            // arg1 : node path
            zk.getChildren("/wh/node2", false, new AsyncCallback.ChildrenCallback() {
                public void processResult(int rc, String path, Object ctx, List<String> children) {
                    // 0 代表创建成功
                    System.out.println(rc);
                    // node path
                    System.out.println(path);
                    // 上下文参数
                    System.out.println(ctx);
                    // 属性描述对象
                    for (String child : children) {
                        System.out.println(child);
                    }
                }
            }, "I am context");
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    
    }
    
    
    image.png
    package exist;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkExistNode {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void exists() throws KeeperException, InterruptedException {
            // arg1 : node path
            Stat stat = zk.exists("/wh/node2", false);
            // 打印节点数据
            System.out.println(stat);
            // 版本信息
            System.out.println(stat.getVersion());
        }
    
        @Test
        public void exists1() throws KeeperException, InterruptedException {
            // 异步方式获取节点
            // arg1 : node path
            zk.exists("/wh/node2", false, new AsyncCallback.StatCallback() {
                public void processResult(int rc, String path, Object ctx, Stat stat) {
                    // 0 代表创建成功
                    System.out.println(rc);
                    // node path
                    System.out.println(path);
                    // 上下文参数
                    System.out.println(ctx);
                    // 属性描述对象
                    System.out.println(stat.getVersion());
                }
            }, "I am context");
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:zookeeper day1

          本文链接:https://www.haomeiwen.com/subject/lxnfohtx.html