美文网首页
ZK springboot整合zookeeper

ZK springboot整合zookeeper

作者: 小P聊技术 | 来源:发表于2021-03-30 08:26 被阅读0次

    1 资源

    资源信息 版本号 备注
    zookeeper 3.4.10 IP: 192.168.51.4
    springboot 2.1.5.RELEASE
    prettyZoo 2.0 zookeeper可视化工具

    zookeeper可视化工具 下载

    springboot-zookeeper-demo 源码 下载

    2 zookeeper安装

    需要安装zookeeper,如果未安装,可参考博文:

    ZK zookeeper单机安装与配置

    3 springboot整合

    3.1 pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.5.RELEASE</version>
            <!--        <version>2.3.2.RELEASE</version>-->
            <relativePath />
        </parent>
    
        <groupId>com.auksat.demo</groupId>
        <artifactId>springboot-zookeeper-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <zookeeper.version>3.4.10</zookeeper.version>
            <curator.version>2.11.1</curator.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>${zookeeper.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    <!--        <dependency>-->
    <!--            <groupId>org.apache.curator</groupId>-->
    <!--            <artifactId>curator-framework</artifactId>-->
    <!--            <version>${curator.version}</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.apache.curator</groupId>-->
    <!--            <artifactId>curator-recipes</artifactId>-->
    <!--            <version>${curator.version}</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.apache.curator</groupId>-->
    <!--            <artifactId>curator-client</artifactId>-->
    <!--            <version>${curator.version}</version>-->
    <!--        </dependency>-->
        </dependencies>
    
    </project>
    

    3.2 配置信息

    3.2.1 application.yml

    zookeeper:
      address: 192.168.51.4:2181
      timeout: 4000
    

    3.2.2 配置类

    package com.auskat.demo.zookeeper.config;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 类文件: ZookeeperConfig
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/24 0024
     * <p>
     * 时     间: 9:23
     * <p>
     */
    @Configuration
    public class ZookeeperConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class);
    
        @Value("${zookeeper.address}")
        private String connectString;
    
        @Value("${zookeeper.timeout}")
        private int sessionTimeout;
    
        public String getConnectString() {
            return connectString;
        }
    
        public void setConnectString(String connectString) {
            this.connectString = connectString;
        }
    
        public int getSessionTimeout() {
            return sessionTimeout;
        }
    
        public void setSessionTimeout(int sessionTimeout) {
            this.sessionTimeout = sessionTimeout;
        }
    
        @Bean(name = "zkClient")
        public ZooKeeper zkClient() {
            ZooKeeper zooKeeper = null;
            try {
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        // 如果收到了服务端的响应事件,说明连接成功
                        if (Event.KeeperState.SyncConnected == event.getState()) {
                            countDownLatch.countDown();
                        }
                    }
                });
                countDownLatch.await();
                logger.info("  初始化ZooKeeper连接状态: {}",zooKeeper.getState());
            } catch (Exception e) {
                logger.error(" 初始化Zookeeper连接状态异常: {}",e.getMessage());
            }
            return  zooKeeper;
        }
    
    
    }
    

    3.3 自定义监听

    package com.auskat.demo.zookeeper.watch;
    
    import com.auskat.demo.zookeeper.config.ZookeeperConfig;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 类文件: WatcherApi
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/24 0024
     * <p>
     * 时     间: 9:40
     * <p>
     */
    public class CustomWatcher implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(CustomWatcher.class);
    
        @Override
        public void process(WatchedEvent event) {
            logger.info("监听事件的状态: {}",event.getState());
            logger.info("监听事件的路径: {}",event.getPath());
            logger.info("监听事件的类型: {}",event.getType());
        }
    
    }
    

    3.4 自定义工具类

    package com.auskat.demo.zookeeper.utils;
    
    import com.auskat.demo.zookeeper.config.ZookeeperConfig;
    import com.auskat.demo.zookeeper.watch.CustomWatcher;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.List;
    
    /**
     * 类文件: ZkUtil
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/24 0024
     * <p>
     * 时     间: 9:35
     * <p>
     */
    @Component
    public class ZkUtil {
    
        private static final Logger logger = LoggerFactory.getLogger(ZkUtil.class);
    
        @Autowired
        private ZooKeeper zkClient;
    
        @Autowired
        private ZookeeperConfig ZooKeeper;
    
    
    
        /**
         * 创建持久化节点
         * -- 客户端断开连接后,节点数据持久化在磁盘上,不会被删除。
         *
         * @param path 路径
         * @param data 数据
         */
        public boolean createPerNode(String path, String data) {
            try {
                // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
                zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return true;
            } catch (Exception e) {
                logger.error("创建持久化节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
                return false;
            }
        }
    
        /**
         * 创建临时节点
         * -- 客户端断开连接后,节点将被删除。
         *
         * @param path 路径
         * @param data 数据
         */
        public boolean createTmpNode(String path, String data) {
            try {
                // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
                zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                return true;
            } catch (Exception e) {
                logger.error("创建临时节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
                return false;
            }
        }
    
        /**
         * 创建自定义节点
         *
         * @param path       路径
         * @param data       数据
         * @param acl        节点权限
         * @param createMode 节点类型
         */
        public boolean createNode(String path, String data, List<ACL> acl, CreateMode createMode) {
            try {
                // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
                zkClient.create(path, data.getBytes(), acl, createMode);
                return true;
            } catch (Exception e) {
                logger.error("创建节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
                return false;
            }
        }
    
    
        /**
         * 修改节点
         *
         * @param path 路径
         * @param data 数据
         */
        public boolean updateNode(String path, String data) {
            try {
                // zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
                // version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
                zkClient.setData(path, data.getBytes(), -1);
                return true;
            } catch (Exception e) {
                logger.error("修改节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
                return false;
            }
        }
    
        /**
         * 删除节点
         *
         * @param path 路径
         */
        public boolean deleteNode(String path) {
            try {
                // version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
                zkClient.delete(path, -1);
                return true;
            } catch (Exception e) {
                logger.error("删除节点异常,路径: {}, 异常: {}", path, e);
                return false;
            }
        }
    
        /**
         * 判断指定节点是否存在
         *
         * @param path      路径
         * @param needWatch 指定是否复用zookeeper中默认的Watcher
         * @return 结果
         */
        public Stat exists(String path, boolean needWatch) {
            try {
                return zkClient.exists(path, needWatch);
            } catch (Exception e) {
                logger.error("判断指定节点是否存在异常,路径: {}, 异常: {}", path, e);
                return null;
            }
        }
    
        /**
         * 检测结点是否存在 并设置监听事件
         * 三种监听类型: 创建,删除,更新
         *
         * @param path    路径
         * @param watcher 传入指定的监听类
         */
        public Stat exists(String path, Watcher watcher) {
            try {
                return zkClient.exists(path, watcher);
            } catch (Exception e) {
                logger.error("判断指定节点是否存在异常,路径: {}, 异常: {}", path, e);
                return null;
            }
        }
    
    
        /**
         * 获取当前节点的子节点(不包含孙子节点)
         *
         * @param path 父节点path
         */
        public List<String> getChildren(String path) throws KeeperException, InterruptedException {
            List<String> list = zkClient.getChildren(path, false);
            return list;
        }
    
        /**
         * 获取指定节点的值
         *
         * @param path 路径
         */
        public String getData(String path, Watcher watcher) {
            try {
                Stat stat = new Stat();
                byte[] bytes = zkClient.getData(path, watcher, stat);
                return new String(bytes);
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
    
    //    /**
    //     * 注册监听
    //     * @param watcher 监听类
    //     */
    //    public void registerWatch(Watcher watcher) throws IOException {
    //        ZooKeeper zooKeeper = new ZooKeeper(ZooKeeper.getConnectString(), ZooKeeper.getSessionTimeout(), watcher);
    //    }
    
    
    }
    

    4 功能测试

    4.1 Idea调试

    package com.auskat.demo.zookeeper;
    
    import com.auskat.demo.zookeeper.utils.ZkUtil;
    import com.auskat.demo.zookeeper.watch.CustomWatcher;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * 类文件: ApplicationTest
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/24 0024
     * <p>
     * 时     间: 9:59
     * <p>
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTest {
    
        @Autowired
        private ZkUtil zkUtil;
    
        /**
         * 新增节点
         */
        @Test
        public void testCreateNode() {
            zkUtil.createPerNode("/demo", "auskat");
        }
    
        /**
         * 修改节点
         */
        @Test
        public void testUpdateNode() {
            zkUtil.updateNode("/demo", "auskat-2");
        }
    
        /**
         * 获取节点是否存在
         * 自定义监听
         */
        @Test
        public void exists() {
            zkUtil.exists("/demo", new CustomWatcher());
        }
    
        /**
         * 获取节点数据
         * 自定义监听
         */
        @Test
        public void getData() throws InterruptedException {
            String data = zkUtil.getData("/demo", new CustomWatcher());
            System.out.println(data);
            zkUtil.updateNode("/demo", "auskat-3");
            Thread.sleep(Long.MAX_VALUE);
        }
    
        /**
         * 删除节点
         */
        @Test
        public void testDeleteNode() {
            zkUtil.deleteNode("/demo");
        }
    
    
    }
    

    4.2 可视化工具查看

    使用 prettyZoo zookeeper可视化工具查看节点信息

    4.2.1 建立连接

    在这里插入图片描述 在这里插入图片描述

    4.2.2 查看数据

    在这里插入图片描述

    5 相关信息

    • 博文不易,辛苦各位猿友点个关注和赞,感谢

    相关文章

      网友评论

          本文标题:ZK springboot整合zookeeper

          本文链接:https://www.haomeiwen.com/subject/krdwhltx.html