美文网首页
zookeeper day1

zookeeper day1

作者: 巴巴11 | 来源:发表于2020-05-16 16:43 被阅读0次

1 zk 简介

zk 管理大数据生态系统中各个组件。(Hadoop、Hive、Spark)


image.png

zk应用场景:

zk是一个经典的分布式数据一致性解决方案。致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

  • 维护配置信息
  • 分布式锁服务
  • 集群管理
  • 生成分布式唯一ID

维护配置信息:

在分布式集群上,需要保证每台机器的配置信息一致。例如数据库的url,用户名密码等。zk提供高可用、高性能的配置服务,能够快速高效的完成集群上配置项的更改,并且能够保证集群上机器配置数据的一致性。

zk 提供配置服务,使用Zab这种一致性协议来保证一致性。

例如在hbase中,客户端先连接zk,获取hbase集群的配置信息,然后才能操作。开源的消息队列kafka中,也是使用zk来维护broker信息。在dubbo中也广泛使用了zk来管理一些配置。

image.png

分布式锁服务:

在分布式系统中,多台服务器运行着相同的服务。当多个服务器在运行时就需要协调各个服务的进度。zk提出了临时有序节点的概念,通过加锁,保证当某个服务在调用时,其他服务不能进行该操作。如果机器挂掉,释放锁并fail over到其他机器继续执行该服务。

image.png

集群管理:

集群中有时因为各种软硬件故障或者网络故障,出现服务器挂掉而被移除出集群或者服务加入集群。zk提供了watch机制,能够将服务的移除/加入的情况通知给集群中其他正常工作的机器,以及时调整存储和计算等任务的分配和执行。zk还会对故障的机器做出诊断并尝试修复。

image.png

生成分布式唯一ID:

每次生成一个新ID时,zk会创建一个持久顺序节点,创建操作返回的节点序号,即为新ID,然后把比自己小的ID删掉。

zk的设计目标:

致力于为分布式应用提供一个高性能、高可用、具有严格顺序访问控制能力的分布式协调服务。

  • 高性能

zk将全量数据存储在内存中,并直接服务于客户端的所有非事务请求。尤其适用于以读为主的场景。

  • 高可用

zk以集群的方式对外提供服务。每台机器都会在内存中维护当前的服务器状态,并且各个机器之间互相保持通信。只要集群中超过一般的机器都能正常工作,那么集群就能正常对外提供服务。

  • 严格顺序访问

对于来自客户端的每个请求,zk都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。

2 zk数据模型

zk进行数据存储时的数据模型

zk的数据节点是树状结构(类似linux文件目录结构)。

树中的每个节点称为znode(zookeeper node),一个znode可用由多个子节点。zk的数据节点在结构上表现为树状结构。使用path来定位到某个znode。比如/wh/node1/node11/node112,wh是根节点。

znode兼具文件和目录两种特点。
节点既像文件一样维护了数据、元信息、ACL、时间戳等数据结构,又像目录一样构成了树结构。作为path的一部分。

image.png

znode大体上分为三部分:

  • 节点的数据:即znode data(节点path,节点data的关系就像map的key-value一样)。
  • 节点的子节点children
  • 节点的状态stat:用来描述当前节点的创建、修改记录,包括cZxid、ctime等。

节点状态stat的属性:
在zk shell中,通过get命令查看指定路径节点的data、stat信息:


image.png

属性说明:

cZxid : 数据节点创建时的事务ID

ctime :数据节点创建时的时间

mZxid : 数据节点最后一次更新时的事务ID

mtime : 数据节点最后一次更新时的时间

pZxid :  数据节点的子节点最后一次被修改时的事务ID

cversion : 子节点的更改次数

dataVersion : 节点数据的更改次数

aclVersion : 节点的ACL的更改次数

ephemeralOwner : 如果节点是临时节点,则表示创建该节点的会话的sessionID;如果节点是持久节点,则该属性值为0

dataLength : 数据内存长度
numChildren : 数据节点当前子节点的数量

节点类型:

分为两种:临时节点和持久化节点。节点的类型在创建时即被确定,并且不能改变。

  • 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话(session)结束,临时节点就会被自动删除。当然也可用手动删除。虽然每个临时节点都会被绑定到一个客户端会话,但他们对所有的客户端都是可见的。临时节点不允许拥有子节点。

  • 持久化节点:生命周期不依赖会话。并且只有在客户端执行删除操作的时候,才能被删除。

一般使用持久化有序节点来创建分布式唯一ID。
一般使用临时有序节点来创建分布式锁。

3 zk linux单机安装

zk依赖jdk


image.png
image.png
image.png
image.png

客户端登陆工具 ./zkCli.sh
./zkCli.sh -server ip

4 zk常用shell命令

image.png
image.png image.png image.png

无法直接delete含有子节点的节点,要有rmr命令。

image.png
image.png image.png image.png

ls2 = ls + stat

image.png

一个监听器的注册只能捕获一次事件

image.png

5 针对zk 数据节点的权限控制

Acl权限控制, Access control list

image.png image.png image.png image.png image.png image.png

多个ip授权
setAcl /node2 ip:192.168.1.1:cdrwa,ip:192.168.1.2:cdr

image.png image.png image.png image.png
image.png

6 zk JavaAPI操作zk数据

通过Java API去操作zk中存储的一系列数据。

image.png
package connection;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import javax.swing.plaf.IconUIResource;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkConnection {
    public static void main(String[] args) {
        try {
            // 计数器
            // zk创建是异步的
            final CountDownLatch latch = new CountDownLatch(1);
            // arg1 zk server ip port
            // arg2 client timeout mills
            // arg3 watcher
            ZooKeeper zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                }
            });
            // 阻塞直到连接创建成功
            latch.await();
            // 打印会话编号
            System.out.println(zk.getSessionId());
            System.out.println("all done...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

image.png
package create;

import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkCreate1 {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void testCreate1() throws KeeperException, InterruptedException {
        // arg1 : node path
        // arg2 : node data
        // arg3 : 权限列表 world:anyone:cdrwa
        // arg4 : 节点类型 持久化节点
        zk.create("/wh/node2", "node2".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

package create;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZkCreate1 {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void testCreate1() throws KeeperException, InterruptedException {
        // arg1 : node path
        // arg2 : node data
        // arg3 : 权限列表 world:anyone:cdrwa
        // arg4 : 节点类型 持久化节点
        zk.create("/wh/node2", "node2".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void create2() throws Exception {
        // arg3 : world:anyone:r
        zk.create("/wh/node3", "node3".getBytes(),
                ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void create3() throws Exception {
        // world授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("world", "anyone");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.READ, id));
        acls.add(new ACL(ZooDefs.Perms.WRITE, id));
        zk.create("/wh/node4", "node4".getBytes(),
                acls, CreateMode.PERSISTENT);
    }

    @Test
    public void create4() throws Exception {
        // ip授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("ip", "localhost");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zk.create("/wh/node5", "node5".getBytes(),
                acls, CreateMode.PERSISTENT);
    }

    @Test
    public void create5() throws Exception {
        // auth授权模式
        // 添加授权用户
        zk.addAuthInfo("digest", "wh:1234".getBytes());
        zk.create("/wh/node5", "node5".getBytes(),
                ZooDefs.Ids.CREATOR_ALL_ACL , CreateMode.PERSISTENT);
    }

    @Test
    public void create6() throws Exception {
        // auth授权模式
        // 添加授权用户
        zk.addAuthInfo("digest", "wh:1234".getBytes());
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("auth", "wh");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.READ, id));
        zk.create("/wh/node5", "node5".getBytes(),
                acls , CreateMode.PERSISTENT);
    }

    @Test
    public void create7() throws Exception {
        // digest授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("digest", "passwdMd5");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zk.create("/wh/node5", "node5".getBytes(),
                acls , CreateMode.PERSISTENT);
    }

    @Test
    public void create8() throws Exception {
        // 持久化顺序节点
        // Ids.OPEN_ACL_UNSAFE  world:anyone:cdrwa
        String result = zk.create("/wh/node5", "node5".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println(result); // 节点路径
    }

    @Test
    public void create9() throws Exception {
        // 临时节点
        // Ids.OPEN_ACL_UNSAFE  world:anyone:cdrwa
        String result = zk.create("/wh/node5", "node5".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL);
        System.out.println(result); // 节点路径
    }

    @Test
    public void create10() throws Exception {
        // 临时顺序节点
        // Ids.OPEN_ACL_UNSAFE  world:anyone:cdrwa
        String result = zk.create("/wh/node5", "node5".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(result); // 节点路径
    }

    @Test
    public void create11() throws Exception {
        // 异步方式创建节点
        zk.create("/wh/node5", "node5".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                        new AsyncCallback.StringCallback() {
                            public void processResult(int rc, String path, Object ctx, String name) {
                                // 0 代表创建成功
                                System.out.println(rc);
                                // node path
                                System.out.println(path);
                                // node name
                                System.out.println(name);
                                // 上下文参数
                                System.out.println(ctx);
                            }
                        }, "I am context");
        Thread.sleep(10000);
        System.out.println("all done...");
    }

}

image.png
package set;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZkSet1 {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void set1() throws KeeperException, InterruptedException {
        // arg1 : node path
        // arg2 : new node data
        // arg3 : 版本号,-1代表版本号不参与更新
        Stat stat = zk.setData("/wh/node2", "node2".getBytes(), -1);
        System.out.println(stat.getCzxid());
    }

    @Test
    public void set2() throws KeeperException, InterruptedException {
        // 异步方式修改节点
        // arg1 : node path
        // arg2 : new node data
        // arg3 : 版本号,-1代表版本号不参与更新
        zk.setData("/wh/node2", "node2".getBytes(), -1
                , new AsyncCallback.StatCallback() {
                    public void processResult(int rc, String path, Object ctx, Stat stat) {
                        // 0 代表创建成功
                        System.out.println(rc);
                        // node path
                        System.out.println(path);
                        // 上下文参数
                        System.out.println(ctx);
                        // 属性描述对象
                        System.out.println(stat.getVersion());
                    }
                }, "I am context");
        Thread.sleep(10000);
        System.out.println("all done...");
    }


}

image.png
package delete;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkDel1 {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void del1() throws KeeperException, InterruptedException {
        // arg1 : node path
        // arg2 : 版本号,-1代表版本号不参与更新
        zk.delete("/wh/node2", -1);
    }

    @Test
    public void del2() throws KeeperException, InterruptedException {
        // 异步方式删除节点
        // arg1 : node path
        // arg2 : new node data
        // arg3 : 版本号,-1代表版本号不参与更新
        zk.delete("/wh/node2", -1
                , new AsyncCallback.VoidCallback() {
                    public void processResult(int rc, String path, Object ctx) {
                        // 0 代表创建成功
                        System.out.println(rc);
                        // node path
                        System.out.println(path);
                        // 上下文参数
                        System.out.println(ctx);
                    }
                }, "I am context");
        Thread.sleep(10000);
        System.out.println("all done...");
    }


}

image.png
package get;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkGet1 {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void get1() throws KeeperException, InterruptedException {
        // arg1 : node path
        // arg3 : 读取节点属性的对象
        Stat stat = new Stat();
        byte[] res = zk.getData("/wh/node2", false, stat);
        // 打印节点数据
        System.out.println(new String(res));
        // 版本信息
        System.out.println(stat.getVersion());
    }

    @Test
    public void get2() throws KeeperException, InterruptedException {
        // 异步方式获取节点
        // arg1 : node path
        // arg3 : 版本号,-1代表版本号不参与更新
        zk.getData("/wh/node2", false, new AsyncCallback.DataCallback() {
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                // 0 代表创建成功
                System.out.println(rc);
                // node path
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // data
                System.out.println(new String(data));
                // 属性描述对象
                System.out.println(stat.getVersion());
            }
        }, "I am context");
        Thread.sleep(10000);
        System.out.println("all done...");
    }


}

image.png
package getChildren;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZkGetChild {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void getChild1() throws KeeperException, InterruptedException {
        // arg1 : node path
        List<String> res = zk.getChildren("/wh/node2", false);
        // 打印节点数据
        for (String child : res)
        System.out.println(child);
    }

    @Test
    public void get2() throws KeeperException, InterruptedException {
        // 异步方式获取子节点
        // arg1 : node path
        zk.getChildren("/wh/node2", false, new AsyncCallback.ChildrenCallback() {
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                // 0 代表创建成功
                System.out.println(rc);
                // node path
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // 属性描述对象
                for (String child : children) {
                    System.out.println(child);
                }
            }
        }, "I am context");
        Thread.sleep(10000);
        System.out.println("all done...");
    }


}

image.png
package exist;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkExistNode {
    ZooKeeper zk;

    @Before
    public void before() throws IOException, InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("connectioned ...");
                    latch.countDown();
                }
            }
        });
        latch.await();
    }

    @After
    public void after() throws InterruptedException {
        zk.close();
    }

    @Test
    public void exists() throws KeeperException, InterruptedException {
        // arg1 : node path
        Stat stat = zk.exists("/wh/node2", false);
        // 打印节点数据
        System.out.println(stat);
        // 版本信息
        System.out.println(stat.getVersion());
    }

    @Test
    public void exists1() throws KeeperException, InterruptedException {
        // 异步方式获取节点
        // arg1 : node path
        zk.exists("/wh/node2", false, new AsyncCallback.StatCallback() {
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                // 0 代表创建成功
                System.out.println(rc);
                // node path
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // 属性描述对象
                System.out.println(stat.getVersion());
            }
        }, "I am context");
        Thread.sleep(10000);
        System.out.println("all done...");
    }

}

相关文章

网友评论

      本文标题:zookeeper day1

      本文链接:https://www.haomeiwen.com/subject/lxnfohtx.html