zookeeper

作者: 骏龙ll | 来源:发表于2021-07-04 09:40 被阅读0次

    转自:https://blog.csdn.net/qq_41112238/article/details/105240421

    基本概念
    大数据生态系统里很多组件的命名都是某种动物,例如Hadoop是🐘,hive是🐝,zookeeper就是动物园管理者,是管理大数据生态系统各组件的管理员。

    zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

    应用场景
    维护配置信息
    Java编程经常会遇到配置项,例如数据库的user、password等,通常配置信息会放在配置文件中,再把配置文件放在服务器上。当需要修改配置信息时,要去服务器上修改对应的配置文件,但在分布式系统中很多服务器都需要使用该配置文件,因此必须保证该配置服务的高可用性和各台服务器上配置的一致性。通常会将配置文件部署在一个集群上,但一个集群涉及的服务器数量是很庞大的,如果一台台服务器逐个修改配置文件是效率很低且危险的,因此需要一种服务可以高效快速且可靠地完成配置项的更改工作。
    zookeeper就可以提供这种服务,使用Zab一致性协议保证一致性。hbase中客户端就是连接zookeeper获得必要的hbase集群的配置信息才可以进一步操作。在开源消息队列Kafka中,也使用zookeeper来维护broker的信息。在dubbo中也广泛使用zookeeper管理一些配置来实现服务治理。

    分布式锁服务
    一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,在多台服务器运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器故障,释放锁并fall over到其他机器继续执行。

    集群管理
    zookeeper会将服务器加入/移除的情况通知给集群中其他正常工作的服务器,以及即使调整存储和计算等任务的分配和执行等,此外zookeeper还会对故障的服务器做出诊断并尝试修复。

    生成分布式唯一ID
    在过去的单库单表系统中,通常使用数据库字段自带的auto_increment熟悉自动为每条记录生成一个唯一的id。但分库分表后就无法依靠该属性来标识一个唯一的记录。此时可以使用zookeeper在分布式环境下生成全局唯一性id。每次要生成一个新id时,创建一个持久顺序结点,创建操作返回的结点序号,即为新id,然后把比自己结点小的删除。

    设计目标
    高性能
    将数据存储在内存中,直接服务于客户端的所有非事务请求,尤其适合读为主的应用场景
    高可用
    一般以集群方式对外提供服务,每台机器都会在内存中维护当前的服务状态,每台机器之间都保持通信。只要集群中超过一般机器都能正常工作,那么整个集群就能够正常对外服务。
    严格顺序访问
    对于客户端的每个更新请求,zookeeper都会生成全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。
    数据结构
    zookeeper的数据结点可以视为树状结构(或者目录),树中各节点成为znode,一个znode可以有多个子结点。zookeeper结点在结构上表现为树状,使用路径来定位某个znode。
    znode兼具文件和目录两种特点,既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。

    znode大体上分为三部分(使用get命令查看)
    ①结点的数据
    ②结点的子结点
    ③结点的状态

    结点类型(在创建时被确定且不能更改)
    ①临时结点:生命周期依赖于创建它的会话,会话结束结点将被自动删除。临时结点不允许拥有子结点。
    ②持久化结点:生命周期不依赖于会话,只有在客户端显式执行删除操作时才被删除。

    安装(我恨linux)
    使用centos8虚拟机,先创建一个zookeeper用户

    zookeeper是基于jdk的,先安装jdk1.8:安装JDK

    安装zookeeper(我是真的烦Linux 找个下载地址找一年 还慢死):

    wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
    1
    30kb/s 慢慢等吧

    终于下载完了,tar -xzvf zookeeper-3.4.14.tar.gz进行解压

    然后进入zookeeper的conf配置目录,复制zoo_sample.cfg 命名zoo.cfg。然后回到上级目录,创建一个data目录。

    然后进入conf目录,使用vi命令编辑刚才复制的zoo.cfg配置文件,打开后按a进入编辑模式。

    修改datadir为刚才创建的data目录,然后按esc,再按:到文件末尾,使用wq保存退出。

    卧槽…居然成功啦
    先cd ..返回zookeeper根目录,再cd bin进入bin目录,./zkServer.sh start 启动zookeeper,等待一段时间后,./zkServer.sh status查看zookeeper是否成功启动,Mode显示的是当前是单机状态

    通过客户端连接本机zookeeper服务./zkCli.sh,登录成功后会打印日志信息

    关闭zookeeper:

    基本命令
    新增结点
    //-s为有序结点,-e为临时结点
    create [-s] [-e] path data
    1
    2
    先启动服务器

    确实已经启动

    通过客户端连接服务器

    创建持久化结点
    path是/hadoop 数据是“123456” 不加选项默认是持久化结点

    通过get读取数据

    持久化结点与当前会话无关,quit退出会话后再次登陆,get数据还在

    创建持久化有序结点
    创建路径为/a,会自动补为/a0000000001

    创建临时结点
    临时结点在会话结束后会被删除

    quit结束会话再次登陆,不能读取到数据

    创建临时有序结点(用于分布式锁)

    更新结点
    更新结点的命令是set,可以直接进行修改

    修改/hadoop的值从123456变为i want offer

    成功

    也可以基于版本号更改,类似CAS机制,当传入数据版本号和当前不一致时拒绝修改,初始版本号dataVersion为0,每次修改后会加1

    当前版本为1,修改时使用1版本号,修改完变为2

    版本号为2时,使用3会失败

    删除结点
    使用delete命令,同样可以传入版本号,如果版本号不符合也不会执行
    版本号为2时,使用3删除失败,使用2删除成功

    如果当前结点下有子结点,不能删除,如果要删除需要使用rmr

    查看结点
    使用get查看结点的属性和数据

    cZxid 数据结点创建时的事务id
    ctime 数据结点创建时间
    mZxid 数据结点最后一次更新时的事务id
    mtime 数据结点最后一次更新的时间
    pZxid 子结点最后一次修改的事务id
    cversion 子结点的更改次数
    dataVersion 结点数据更改次数
    aclVersion 结点ACL的更改次数
    ephemeralOwner 如果是临时结点,表示会话的sessionID;如果是持久结点值为0
    dataLength 数据内容长度
    numChildren 子结点数

    使用stat只返回属性 没有数据

    查看结点列表
    使用ls path或ls2 path,ls2是ls的增强,除了列出子结点还有当前结点的属性

    监听器(维护配置信息)
    使用get path watch,监听器只能使用一次
    左边客户端使用监听器,右边客户端更改了数据,左边监听到了数据的更改

    也可以使用stat path watch

    还可以使用ls/ls2 path watch 可以监听到子结点

    ACL权限控制
    类似linux针对文件的权限控制
    acl权限控制使用scheme:id:permission标识,
    ①scheme :权限模式

    world 只有一个用户,代表登陆zookeeper的所有人(默认)
    ip 对客户端使用ip地址认证
    auth 使用已添加认证的用户认证
    digest 使用用户名:密码方式认证
    ②id :授权对象
    ③permission :授予的权限

    create 简写c 可以创建子结点
    delete 简写d 可以删除子结点(仅下一级)
    read 简写r 可以读取结点数据以及显示子结点
    write 简写w 可以设置结点数据
    admin 简写a 可以设置节点访问控制列表权限
    例如setAcl /hadoop ip:192.168.2.142:crwda 表示ip地址为192.168.2.142的客户端对/hadoop有全部权限
    getAcl path 读取权限
    setAcl path acl 设置权限
    addauth schema suth 添加认证用户

    world授权模式
    setAcl path world:anyone:<acl>
    例如取消c创建权限,就不能再创建子结点

    取消d权限,不能删除子结点

    ip授权模式
    命令setAcl path ip:<ip>:<acl>
    要用两个虚拟机,这个就不演示了…
    主要就是限制客户端ip地址的

    auth模式
    addauth digest <user>:<password>添加认证用户
    setAcl <path> auth:<user>:<acl> 授权
    1
    2

    另一台客户端如果没有添加该用户就不能读取

    digest授权模式
    setAcl <path> digest:<user>:<password>:<acl>
    这里的密码是经过SHA1及BASE64处理的密文
    通过echo -n sjh:sjh2019. | openssl dgst -binary -sha1 | openssl base64生成密文,复制该结果

    添加结点/node4,使用digest模式授权,因为之前添加过sjh密码sjh019.的用户,所以可以访问

    多种授权模式
    同一个结点可以使用多种授权模式,用逗号隔开就行,例

    超级管理员
    假设超级管理员的账号是super:admin,先要为其生成密文:

    得到密文xQJmxLMiHGwaqBvst5y6rkB6HQs=
    在zookeeper目录下/bin/zkServer.sh服务器脚本文件,找到这一行

    "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="加入这一句

    修改完按esc ,按:, 输入wq保存退出
    重启

    使用客户端登陆,创建/node6,取消创建子结点权限

    此时不能创建子结点

    添加超级管理员用户,此时可以成功添加子结点

    IDEA操作zookeeper
    操作流程
    连接到zookeeper服务器(好害怕连不上- - 。。。昨天GitHub连上了,不知道今天能不能再撞一次运气。。连不上这篇文章也就到此为止了。。。)
    定期向服务器发送心跳,否则会过期
    会话处于活动状态就可以获取/设置znode
    所有任务完成后,断开连接
    连接到zookeeper
    创建一个新的Java工程,导入一下jar包

    妈的连了快俩小时终于搞定了!!!

    导完jar包后还需要导入一个log4j文件,创建一个连接zookeeper的测试类

    public class zookeeperTest {

    public static void main(String[] args) {
        ZooKeeper zooKeeper = null;
        try{
            //创建一个计数器对象
            CountDownLatch countDownLatch=new CountDownLatch(1);
            //第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
            zooKeeper=new ZooKeeper("192.168.2.142:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()==Event.KeeperState.SyncConnected){
                        System.out.println("连接创建成功");
                        //通知主线程解除阻塞
                        countDownLatch.countDown();
                    }
                }
            });
            //主线程阻塞,等待连接对象的创建成功
            countDownLatch.await();
            System.out.println("会话编号"+zooKeeper.getSessionId());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(zooKeeper!=null) {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    由于连接是异步的,所以用countdownlatch确保连接成功再继续
    一直失败!
    最后关了防火墙。。。就成功了!!!

    创建结点
    在Linux客户端先创建一个/create结点

    在IDEA创建一个子结点/node1

    public class zkCreate {

    private static final String IP="192.168.2.142:2181";
    private static ZooKeeper zooKeeper;
    
    @Before
    public void connect() throws Exception{
        //创建一个计数器对象
        CountDownLatch countDownLatch=new CountDownLatch(1);
        //第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
        zooKeeper=new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()==Event.KeeperState.SyncConnected){
                    System.out.println("连接创建成功");
                    //通知主线程解除阻塞
                    countDownLatch.countDown();
                }
            }
        });
        //主线程阻塞,等待连接对象的创建成功
        countDownLatch.await();
    }
    
    @After
    public void close() throws Exception{
        zooKeeper.close();
    }
    
    @Test
    public void create1() throws Exception{
        //同步创建结点
        // 参数1 结点路径
        // 参数2 结点数据
        // 参数3权限列表 OPEN_ACL_UNSAFE代表world方式授权 cdrwa
        // 参数4 结点类型 persistent表示持久化结点
        zooKeeper.create("/create/node1","i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    运行后在Linux客户端查看

    创建一个只有读权限的数据

    @Test
    public void create2() throws Exception{
        //  OPEN_ACL_UNSAFE代表world方式授权 r只能读
        zooKeeper.create("/create/node2","i want offer".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    

    1
    2
    3
    4
    5
    运行后查询

    自定义设置权限列表
    使用world模式,设置有创建和删除权限

    @Test
    public void create3() throws Exception{
        //自定义方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("world","anyone");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        acls.add(new ACL(ZooDefs.Perms.DELETE,id));
        zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }
    

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ip模式授权

    @Test
    public void create4() throws Exception{
    //ip方式设置权限
    List<ACL> acls=new ArrayList<>();
    Id id=new Id("ip","192.168.2.142");
    acls.add(new ACL(ZooDefs.Perms.CREATE,id));
    zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    auth方式授权

    @Test
    public void create5() throws Exception{
    //auth方式设置权限
    zooKeeper.addAuthInfo("digest","sjh:sjh2019.".getBytes());
    zooKeeper.create("/create/node5","i want offer".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    }
    1
    2
    3
    4
    5
    6
    digest方式授权

    @Test
    public void create6() throws Exception{
    //digest方式设置权限
    List<ACL> acls=new ArrayList<>();
    Id id=new Id("digest","sjh:base64和sha1加密后的密码");
    acls.add(new ACL(ZooDefs.Perms.CREATE,id));
    zooKeeper.create("/create/node5","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    创建持久化有序结点

    @Test
    public void create7() throws Exception{
    //持久化有序结点
    String s = zooKeeper.create("/create/node7", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    System.out.println(s);
    }
    1
    2
    3
    4
    5
    6
    运行结果:

    创建临时结点

    @Test
    public void create8() throws Exception{
    //创建临时结点
    String s = zooKeeper.create("/create/node8", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.out.println(s);
    }
    1
    2
    3
    4
    5
    6
    创建临时有序结点

    @Test
    public void create9() throws Exception{
    //创建临时结点
    String s = zooKeeper.create("/create/node9", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(s);
    }
    1
    2
    3
    4
    5
    6
    异步创建结点

     @Test
    public void create10() throws Exception{
        //异步创建结点
        zooKeeper.create("/create/node11", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                System.out.println("创建状态: "+rc);//0表示创建成功
                System.out.println("path: "+path);//结点路径
                System.out.println("name: "+name);//结点路径
                System.out.println("ctx: "+ctx);//上下文
            }
        },"context");
    }
    

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    运行结果,由于是异步的,可能还没有输出就结束了

    调用sleep方法让线程休眠,等待zookeeper操作执行完毕

    成功:

    更新结点
    先查询/create/node1结点的当前值为i wangt offer

    同步更新结点:

    @Test
    public void set1() throws Exception{
        //同步更新结点
        //第一个参数 结点路径
        //第二个参数 要修改的值
        //第三个参数 数据版本 -1代表版本号不参与更新
        zooKeeper.setData("/create/node1","2020GetGoodOffer".getBytes(),-1);
    }
    

    1
    2
    3
    4
    5
    6
    7
    8
    运行完成后,再次查询该结点,值已经更改为2020GetGoodOffer

    异步修改结点

    @Test
    public void set2() throws Exception{
    //异步更新结点
    //第一个参数 结点路径
    //第二个参数 要修改的值
    //第三个参数 数据版本 -1代表版本号不参与更新
    //第四个参数 匿名回调函数
    //第五个参数 上下文参数
    zooKeeper.setData("/create/node1", "2020 Get Offer!!!".getBytes(), -1, new AsyncCallback.StatCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
    System.out.println("rc: "+rc);//0表示成功
    System.out.println("path: "+path);
    System.out.println("ctx: "+ctx);
    System.out.println("stat: "+stat);
    }
    },"context");
    Thread.sleep(1000);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    结果:

    删除结点
    同步删除

    @Test
    public void del1() throws Exception{
    //同步删除数据
    //第一个参数表示删除结点的路径
    //第二个参数表示删除结点的数据版本 -1表示删除时不考虑版本信息
    zooKeeper.delete("/create/node1",-1);
    }
    1
    2
    3
    4
    5
    6
    7
    此时数据已不存在

    异步删除

    @Test
    public void del2() throws Exception{
    //异步删除数据
    zooKeeper.delete("/create/node2", -1, new AsyncCallback.VoidCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx) {
    System.out.println("rc: "+rc);//0表示成功
    System.out.println("path: "+path);
    System.out.println("ctx: "+ctx);
    }
    },"context");
    Thread.sleep(1000);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    运行结果

    查看结点
    同步查看结点

    @Test
    public void get1() throws Exception{
    //同步读取数据
    //第一个参数是路径
    //第二个参数是watch,先填false(以后在讲)
    //第三个参数用于获取结点属性
    Stat stat = new Stat();
    byte[] data = zooKeeper.getData("/create/node3", false, stat);
    System.out.println(new String(data));//数据
    System.out.println(stat);//属性
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    结果

    异步查看结点

    @Test
    public void get2() throws Exception{
    //异步读取数据
    //第一个参数是路径
    //第二个参数是watch,先填false(以后在讲)
    //第三个参数是匿名回调函数
    Stat stat = new Stat();
    zooKeeper.getData("/create/node3", false, new AsyncCallback.DataCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
    System.out.println("rc: "+rc);//0表示成功
    System.out.println("path: "+path);//结点路径
    System.out.println("ctx: "+ctx);//上下文参数
    System.out.println(new String(data));//数据
    System.out.println(stat);//属性
    }
    },"context");
    Thread.sleep(1000);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    运行结果:

    查看子结点
    创建测试数据

    同步

    @Test
    public void getChild1() throws Exception{
        //同步
        //第一个参数是父路径
        //第二个参数是watch,先填false(以后在讲)
        List<String> children = zooKeeper.getChildren("/create/father", false);
        for(String str:children)
            System.out.println(str);
    }
    

    1
    2
    3
    4
    5
    6
    7
    8
    9
    运行结果:

    异步

    @Test
    public void getChild2() throws Exception{
    //异步
    //第一个参数是父路径
    //第二个参数是watch,先填false(以后在讲)
    //第三个参数是匿名回调函数
    zooKeeper.getChildren("/create/father", false, new AsyncCallback.ChildrenCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
    System.out.println("rc: "+rc);//0表示成功
    System.out.println("path: "+path);//结点路径
    System.out.println("ctx: "+ctx);//上下文参数
    System.out.println(children);//子结点信息
    }
    },"context");
    Thread.sleep(1000);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    运行结果

    检查结点是否存在
    同步

    @Test
    public void exists1() throws Exception{
        //同步判断
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        Stat exists = zooKeeper.exists("/create/null", false);
        System.out.println(exists==null?"不存在":"存在");
    }
    

    1
    2
    3
    4
    5
    6
    7
    8
    运行结果:

    异步

    @Test
    public void exists2() throws Exception{
    //异步判断
    //第一个参数是路径
    //第二个参数是watch,先填false(以后在讲)
    //第三个参数是匿名回调函数
    //第四个参数上下文
    zooKeeper.exists("/create/null", false, new AsyncCallback.StatCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
    System.out.println("rc: "+rc);//0表示成功
    System.out.println("path: "+path);//结点路径
    System.out.println("ctx: "+ctx);//上下文参数
    System.out.println(stat==null?"不存在":"存在");
    }
    },"context");
    Thread.sleep(1000);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    运行结果:

    相关文章

      网友评论

          本文标题:zookeeper

          本文链接:https://www.haomeiwen.com/subject/qysmultx.html