美文网首页
zookeeper-zkclinet,curator,简单锁

zookeeper-zkclinet,curator,简单锁

作者: xc2023 | 来源:发表于2019-03-23 12:29 被阅读0次

    zkclient

    1),可以递归创建
    2),可以递归删除

    创建一个单机的zookeeper

    systemctl stop firewalld#关闭防火墙
    docker pull zookeeper:3.4 #拉取镜像
    docker run -d --name=zookeeper -p 2181:2181 zookeeper:3.4 #创建容器
    docker exec -it zookeeper /bin/bash#进入容器
    zkCli.sh #进入zookeeper客户端
    

    依赖

    <dependency>
                <groupId>com.github.sgroschupf</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.11</version>
            </dependency>
    

    zkclientApi

    package com.demo.service;
    
    import com.demo.config.Person;
    import org.I0Itec.zkclient.IZkChildListener;
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.SerializableSerializer;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.List;
    
    public class ZkService {
    
        public static ZkClient zkClient =
                new ZkClient("192.168.116.150:2181", 5000, 5000, new SerializableSerializer());
    
        /**
         *  创建节点
         * path 节点路径
         * data 数据 ,可谓null
         * mode 4种类型 Ephemeral 临时的会话 persistent 永久的会话
         * acl acl策略
         * callback 注册一个异步回调函数
         * context 传递一个对象
         * createParents 指定是否创建父节点
         */
        public static void create() {
            zkClient.create("/test1","zkclient_test", CreateMode.EPHEMERAL);
            zkClient.createEphemeral("/test2");
            zkClient.createPersistent("/person",new Person("xiaoming","123456"));
            zkClient.createPersistent("/hello/c1",true);
        }
    
        /**
         * 删除
         */
        public static void delete() {
            zkClient.delete("/person/xiaoming"); //删除节点
            //zkClient.deleteRecursive("/testzoo1");//删除节点和子节点
        }
    
        /**
         * 修改
         */
        public static void update() {
            //zkClient.writeData("/testzoo3","hello");//写数据,覆盖原来的值
        }
    
        /**
         * 是否存在
         */
        public static void exists() {
            zkClient.exists("/Person");
        }
    
        /**
         * 读取节点的值
         * 对象要实现序列化接口
         */
        public static void select() {
            Stat stat = new Stat(); //节点的信息
            Person person = zkClient.readData("/person", stat);
            System.out.println(stat);
            System.out.println(person);
        }
    
        /**
         * 注册监听会开启一个新的线程来处理,无需自己在开一条线程单独注册
         *  监听接口                    注册监听方法                                  解除监听
         * IZkChildListener监听子节点     ZkClient的subscribeChildChanges方法        ZkClient的unsubscribeChildChanges方法
         * IZkDataListener 监听数据的变化     ZkClient的subscribeDataChanges方法         ZkClient的subscribeDataChanges方法
         * IZkStateListener监听服务状态的状态     ZkClient的subscribeStateChanges方法        ZkClient的unsubscribeStateChanges方法
         */
        public static void listen() {
            zkClient.subscribeChildChanges("/person", new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    System.out.println("parentpath" + parentPath + "--" + currentChilds);
                }
            });
        }
        /**
         * 监听节点数据的变化
         * */
        public static void listen(String string) {
            zkClient.subscribeDataChanges("/person", new IZkDataListener() {
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    //变化触发
                }
    
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    //删除时触发
                }
            });
        }
        /**
         * 添加数据
         */
        public static void main(String[] args) throws InterruptedException {
            ZkService.select();
    
            Thread.sleep(60 * 1000);
        }
    }
    
    zoo1.png
    zoo2.png

    zkclient分布式锁
    有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,
    临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
    事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更。

    流程

    • 客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推。
    • 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听/lock的子节点变更消息,获得子节点变更通知后重复此步骤直至获得锁;
    package com.demo.lock;
    
    public interface BaseLock {
    
        /**
         * 获取锁
         * */
        boolean getlock();
    
        /**
         * 释放锁
         * */
        void unlock();
    }
    
    public class BaseLockImpl implements BaseLock {
    
        private static final String ZOOKEEPER_IP_PORT = "192.168.116.150:2181";
        private static final String LOCK_PATH = "/LOCK";
        private CountDownLatch countDownLatch;
        private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
        private String beforePath;
        private String currentPath;
    
        // 判断有没有LOCK目录,没有则创建
        public BaseLockImpl() {
            if (!this.client.exists(LOCK_PATH)) {
                this.client.createPersistent(LOCK_PATH);
            }
        }
    
        @Override
        public boolean getlock() {
            if (tryLock()) {
                System.out.println("=======================获取锁");
                return true;
            } else {
                waitForLock();
                return getlock();
            }
        }
    
        @Override
        public void unlock() {
            // 删除当前临时节点
            client.delete(currentPath);
            System.out.println("======删除节点=================");
        }
    
        /**
         * 创建节点
         */
        public Boolean tryLock() {
            // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
            if (currentPath == null || currentPath.length() <= 0) {
                // 创建一个临时顺序节点
                currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
                // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
                List<String> childrens = this.client.getChildren(LOCK_PATH);
                //由小到大排序所有子节点
                Collections.sort(childrens);
                //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
                if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
                    return true;
                }
                //监听前一个节点
                else {
                    int wz = Collections.binarySearch(childrens, currentPath.substring(6));
                    beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
                    return false;
                }
            }
            return true;
        }
    
        //等待锁,对次小节点进行监听
        private void waitForLock() {
            IZkDataListener listener = new IZkDataListener() {
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                }
    
                public void handleDataChange(String dataPath, Object data) throws Exception {
    
                }
            };
    
            // 对次小节点进行监听,排在前面的的节点增加数据删除的watcher
            this.client.subscribeDataChanges(beforePath, listener);
            if (this.client.exists(beforePath)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.client.unsubscribeDataChanges(beforePath, listener);
        }
    
    

    业务类

    public class Order {
    
        // 自增长序列
        private static int i = 0;
    
        // 按照规则生成订单编号
        public synchronized String getOrderCode() {
            Date now = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
            return sdf.format(now) + ++i;
        }
    }
    

    测试10个并发

    public class LocalService implements Runnable{
    
        private static Order order = new Order();
        // 同时并发的线程数
        private static final int num = 10;
        // 按照线程数初始化倒计数器,倒计数器
        //保证线程同时执行
        private static CountDownLatch cdl = new CountDownLatch(num);
        private BaseLock baseLock = new BaseLockImpl();
    
        public void createOrder(){
            String orderCode = null;
            baseLock.getlock();
            try {
                orderCode = order.getOrderCode();
                System.out.println(orderCode);
            }catch (Exception e){
                //todo
            }finally {
                baseLock.unlock();
            }
        }
        @Override
        public void run() {
            try {
                cdl.await();
            }catch (Exception e){
                e.printStackTrace();
            }
            //创建订单
            createOrder();
        }
        public static void main(String[] args) {
            for (int i = 1; i <= num; i++) {
                // 按照线程数迭代实例化线程
                new Thread(new LocalService()).start();
                // 创建一个线程,倒计数器减1
                cdl.countDown();
            }
        }
    }
    
    zoo3.png
    zoo4.png

    curator

    curator是连接ZK应用最广泛的工具
    zk分布式锁,Master选举等等,curator包含了这些场景。
    依赖

    <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.0</version>
            </dependency>
    

    curator对节点的增删改查,事件监听

    public class CuratorService {
    
    
        public static CuratorFramework client = null;
        // 集群模式则是多个ip:port,ip:port
        public static final String zkServerIps = "192.168.116.150:2181";
        //缓存节点,监听节点数据变动
        final static NodeCache nodeCache = new NodeCache(client,"/path");
        // 为子节点添加watcher
        // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
        final static PathChildrenCache childrenCache = new PathChildrenCache(client, "/path", true);
    
        /**
         * basesleeptimems 初始化sleep的时间
         * maxretries 最大重试次数
         * maxsleeoms 最大重试时间
         */
        public CuratorService() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerIps)
                    .sessionTimeoutMs(10000)
                    .retryPolicy(retryPolicy)
                    .build();
            client.start();
            System.out.println("qi dong ke hu duan ...");
    
        }
    
        private void close() {
            if (client != null) {
                this.client.close();
            }
        }
    
        public static void main(String[] args) throws Exception {
            CuratorService curatorService = new CuratorService();
            // 创建节点
            String nodePath = "/super/testNode";  // 节点路径
            byte[] data = "this is a test data".getBytes();  // 节点数据
            String result = curatorService.client.create().creatingParentsIfNeeded()  // 创建父节点,也就是会递归创建
                    .withMode(CreateMode.PERSISTENT)  // 节点类型
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)  // 节点的acl权限
                    .forPath(nodePath, data);
    
            System.out.println(result + "节点,创建成功...");
            // 更新节点数据
            byte[] newData = "this is a new data".getBytes();
            Stat resultStat = curatorService.client.setData().withVersion(0)  // 指定数据版本
                    .forPath(nodePath, newData);  // 需要修改的节点路径以及新数据
            // 删除节点
            curatorService.client.delete()
                    .guaranteed()  // 如果删除失败,那么在后端还是会继续删除,直到成功
                    .deletingChildrenIfNeeded()  // 子节点也一并删除,也就是会递归删除
                    .withVersion(resultStat.getVersion())
                    .forPath(nodePath);
            Thread.sleep(1000);
            // 读取节点数据
            Stat stat = new Stat();
            byte[] nodeData = curatorService.client.getData().storingStatIn(stat).forPath(nodePath);
            System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
            System.out.println("该节点的数据版本号为:" + stat.getVersion());
            // 获取子节点列表
            List<String> childNodes = curatorService.client.getChildren().forPath(nodePath);
            System.out.println(nodePath + " 节点下的子节点列表:");
            // 查询某个节点是否存在,存在就会返回该节点的状态信息,如果不存在的话则返回空
            Stat statExist = curatorService.client.checkExists().forPath(nodePath);
            if (statExist == null) {
                System.out.println(nodePath + " 节点不存在");
            } else {
                System.out.println(nodePath + " 节点存在");
            }
            for (String childNode : childNodes) {
                System.out.println(childNode);
            }
            //缓存节点的数据
            curatorService.nodeCache.start(true);
            curatorService.nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    // 防止节点被删除时发生错误
                    if (curatorService.nodeCache.getCurrentData() == null) {
                        System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除");
                        return;
                    }
                    // 获取节点最新的数据
                    String data = new String(curatorService.nodeCache.getCurrentData().getData());
                    System.out.println(curatorService.nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data);
                }
            });
            /**
             * 监听子节点初始化的方式
             * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
             * NORMAL:异步初始化
             * BUILD_INITIAL_CACHE:同步初始化
             * */
            curatorService.childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
            List<ChildData> childDataList = childrenCache.getCurrentData();
            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    // 通过判断event type的方式来实现不同事件的触发
                    if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                        System.out.println("\n--------------\n");
                        System.out.println("子节点初始化成功");
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                        //if (event.getData().getPath().trim().equals(NODE_PATH)) {}
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                        System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                        System.out.println("\n--------------\n");
                        System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                        System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                    }
                }
            });
            // 关闭客户端
            curatorService.close();
        }
    
    }
    

    分布式锁

    相关文章

      网友评论

          本文标题:zookeeper-zkclinet,curator,简单锁

          本文链接:https://www.haomeiwen.com/subject/skatvqtx.html