美文网首页我爱编程
利用zookeeper实现缓存更新功能

利用zookeeper实现缓存更新功能

作者: VIPSHOP_FCS | 来源:发表于2017-12-29 22:40 被阅读0次
      最近在用了堆外缓存之后,由于存在着缓存不更新的风险,有时候需要对缓存进行处理。该域用的是公司开发服务平台框架,有点类似Tomcat运用。
      费劲脑汁,暂时想到两种方案:
    
    1. 开发一个专门用于运维的接口,每次需要运维时,指定ip进行(若不指定ip,由于有多台部署机器,请求路由到完全不可知的机器上)

    2. 采用公司分布式配置依赖最多的,功能也相对强大的zookeeper框架。运用zk节点的内容变化时的及时通知机制。

    两种方案的比较

    优点 缺点
    方案一 实施起来简单 没有挑战性,需要知道所有ip,其次ip改变就比较难维护了
    方案二 修改节点值,客户端能及时监听到,从而能做出相应的操作,比如在本例中可以进行缓存的更新,而不需要知道客户端的ip 需要有操作zk的操作权限

    经过上面的比较,决定采用方法二(假定已申请到zk的操作权限)

    动手前准备

    1.下载zookeeper-3.3.6,解压后看到

    image

    2. 进入conf目录下,新建一个zoo.cfg文件,并写入

    # The number of milliseconds of each tick  心跳间隔 毫秒每次
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting anacknowledgement
    syncLimit=5
    # the directory where the snapshot isstored.  //镜像数据位置
    dataDir=E:\\develop-tools\\zookeeper-3.3.6\\data
    #日志位置
    dataLogDir=E:\\develop-tools\\zookeeper-3.3.6\\logs
    # the port at which the clients willconnect  客户端连接的端口
    clientPort=2181
    
    server.0=127.0.0.1:8880:7770
    #server.1=127.0.0.1:8881:7771
    #server.2=127.0.0.1:8882:7772
    

    3. 双击bin目录下的 zkServer.cmd,即可在Windows环境下启动一个zk服务端。(zkCli的用法,大家可以自行学习)

    开始动手

    为了代码优雅,由于ZooKeeper原生客户端的各类操作方法比较繁琐,用户体验不好,下面我们用Curator+Spring实现demo:

    1.封装获取zooKeeper客户端连接的工厂类:ZookeeperFactory

    package com.vip.fcs.ps.zk.demo;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.util.StringUtils;
    
    /**
     * 获取zookeeper客户端链接工厂类
     */
    public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
    
        private String zkHosts;
        // session超时
        private int sessionTimeout = 30000;
        private int connectionTimeout = 30000;
    
        // 共享一个zk链接
        private boolean singleton = true;
    
        // 全局path前缀,常用来区分不同的应用
        private String namespace;
    
        private final static String ROOT = "vip";
    
        private CuratorFramework zkClient;
    
        public void setZkHosts(String zkHosts) {
            this.zkHosts = zkHosts;
        }
    
        public void setSessionTimeout(int sessionTimeout) {
            this.sessionTimeout = sessionTimeout;
        }
    
        public void setConnectionTimeout(int connectionTimeout) {
            this.connectionTimeout = connectionTimeout;
        }
    
        public void setSingleton(boolean singleton) {
            this.singleton = singleton;
        }
    
        public void setNamespace(String namespace) {
            this.namespace = namespace;
        }
    
        @Override
        public CuratorFramework getObject() throws Exception {
            if (singleton) {
                if (zkClient == null) {
                    zkClient = create();
                    zkClient.start();
                }
                return zkClient;
            }
            return create();
        }
    
        @Override
        public Class<?> getObjectType() {
            return CuratorFramework.class;
        }
    
        @Override
        public boolean isSingleton() {
            return singleton;
        }
    
        public CuratorFramework create() throws Exception {
            if (StringUtils.isEmpty(namespace)) {
                namespace = ROOT;
            } else {
                namespace = ROOT + "/" + namespace;
            }
            return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
        }
    
        public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout,
                String namespace) {
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
            return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
                    .canBeReadOnly(true).namespace(namespace)
                    .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)).defaultData(null).build();
        }
    
        public void close() {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
    

    2.zookeeper的操作类:ZkHandler

    package com.vip.fcs.ps.zk.demo;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.curator.framework.recipes.cache.ChildData;
    import org.apache.curator.framework.recipes.cache.TreeCache;
    import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
    import org.apache.curator.framework.recipes.cache.TreeCacheListener;
    import org.apache.zookeeper.CreateMode;
    import org.springframework.beans.factory.InitializingBean;
    
    /**
     * 结合spring,封装了zk的相关操作
     */
    public class ZkHandler implements InitializingBean {
        private CuratorFramework zkClient;
        private String service = "ps";
        private String version = "1.0";
    
        private TreeCache treeCache;
    
        public void setZkClient(CuratorFramework zkClient) {
            this.zkClient = zkClient;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
    
            // 如果zk尚未启动,则启动
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
            buildTreeCache(zkClient, getServicePath());
            // 开始监听
            treeCache.start();
    
        }
    
        /**
         * 监听的节点
         * @return
         */
        private String getServicePath() {
            return "/" + service + "/" + version;
        }
    
        private void buildTreeCache(final CuratorFramework zkClient, String path) {
            // 设置节点的cache
            treeCache = new TreeCache(zkClient, path);
            // 设置监听器和处理过程
            treeCache.getListenable().addListener(new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                    ChildData data = event.getData();
                    String log = ("NODE_ADDED : " + data.getPath() + " --->DATA:" + new String(data.getData()));
                    if (data != null) {
                        switch (event.getType()) {
                        case NODE_ADDED:
                            System.out.println(log);
                            // do sth
                            break;
                        case NODE_REMOVED:
                            System.out.println(log);
                            // do sth
                            break;
                        case NODE_UPDATED:
                            System.out.println(log);
                            // do sth
                            break;
                        default:
                            break;
                        }
                    } else {
                        System.out.println("data is NULL" + event.getType());
                    }
                }
            });
        }
    
        public void createPersistentNode() throws Exception {
            if (zkClient.checkExists().forPath(getServicePath()) == null) {
                zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
                        .forPath(getServicePath());
            }
        }
    
        public void changeNodeData(String nodeData) throws Exception {
            if (zkClient.checkExists().forPath(getServicePath()) != null) {
                zkClient.setData().forPath(getServicePath(), nodeData.getBytes());
            }
        }
    
    }
    
    1. 下面我们再创建3个客户端(2个watcher+一个用来改变节点值的client)

    3.1 ZkWatcherOne

    package com.vip.fcs.ps.zk.demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class ZkWatcherOne {
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
            ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
            while (true) {
                Thread.sleep(2000L);
            }
        }
    }
    
    

    3.2 ZkWatcherTwo

    package com.vip.fcs.ps.zk.demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class ZkWatcherTwo {
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
            ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
            while (true) {
                Thread.sleep(2000L);
            }
        }
    }
    
    

    3.3 ZkSetClient

    package com.vip.fcs.ps.zk.demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class ZkSetClient {
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
            ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
            zkHandler.createPersistentNode();
            for (int i = 0; i < 10; i++) {
                zkHandler.changeNodeData("time" + i);
                Thread.sleep(2000L);
            }
        }
    }
    
    

    4.开始运行相关代码

    4.1 启动ZkWatcherOne, 控制台打印日志

    NODE_ADDED : /ps/1.0 --->DATA:

    4.2 启动ZkWatcherTwo, 控制台打印日志

    NODE_ADDED : /ps/1.0 --->DATA:

    4.3 这时候启动ZkSetClient,在ZkWatcherOne和ZkWatcherTwo的控制台最终打印出如下的日志

    image

    由此可见,当一个节点值改变时,其他节点都能接收到监听事件,并能够做出相应的操作。在监听事件中,我们可以对本机缓存进行更新,由此达到了我们的目的。
    4.4 相关配置文件 applicationContext.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"
           default-lazy-init="false">
    
        <!-- zookeeper -->
        <bean id="zookeeper" class="com.vip.fcs.ps.zk.demo.ZookeeperFactory"
              destroy-method="close">
            <property name="zkHosts"
                      value="127.0.0.1:2181"/>
            <property name="namespace" value="ps.api"/>
            <property name="connectionTimeout" value="3000"/>
            <property name="sessionTimeout" value="3000"/>
            <property name="singleton" value="true"/>
        </bean>
    
        <!-- zookeeper -->
        <bean id="zkHandler" class="com.vip.fcs.ps.zk.demo.ZkHandler">
            <property name="zkClient"
                      ref="zookeeper"/>
        </bean>
    </beans>
    

    4.4 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.fcs.ps.demo</groupId>
       <artifactId>zk-demo</artifactId>
       <version>1.0-SNAPSHOT</version>
       <properties>
           <spring.version>4.0.7.RELEASE</spring.version>
       </properties>
       <dependencies>
           <dependency>
               <groupId>commons-pool</groupId>
               <artifactId>commons-pool</artifactId>
               <version>1.6</version>
           </dependency>
           <dependency>
               <groupId>org.springframework</groupId>
               <artifactId>spring-context</artifactId>
               <version>4.0.9.RELEASE</version>
           </dependency>
    
           <dependency>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
               <version>3.4.6</version>
           </dependency>
           <dependency>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
               <version>3.4.9</version>
           </dependency>
           <dependency>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-framework</artifactId>
               <version>4.0.0</version>
           </dependency>
           <dependency>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-test</artifactId>
               <version>4.0.0</version>
           </dependency>
           <dependency>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-recipes</artifactId>
               <version>4.0.0</version>
           </dependency>
           <dependency>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-x-discovery</artifactId>
               <version>4.0.0</version>
           </dependency>
    
       </dependencies>
    
    </project>
    

    拓展 :zookeeper如此强大,我们进一步了解一下其优缺点,已经相关的应用场景。

    zookeeper 特点及优点 :

    1. zookeeper是一个精简的文件系统。这点它和hadoop有点像,但是zookeeper这个文件系统是管理小文件的,而hadoop是管理超大文件的。

    2. zookeeper提供了丰富的“构件”,这些构件可以实现很多协调数据结构和协议的操作。例如:分布式队列、分布式锁以及一组同级节点的“领导者选举”算法。

    3. zookeeper是高可用的,它本身的稳定性是相当之好,分布式集群完全可以依赖zookeeper集群的管理,利用zookeeper避免分布式系统的单点故障的问题。

    4. zookeeper采用了松耦合的交互模式。这点在zookeeper提供分布式锁上表现最为明显,zookeeper可以被用作一个约会机制,让参入的进程不在了解其他进程的(或网络)的情况下能够彼此发现并进行交互,参入的各方甚至不必同时存在,只要在zookeeper留下一条消息,在该进程结束后,另外一个进程还可以读取这条信息,从而解耦了各个节点之间的关系。

    5. zookeeper为集群提供了一个共享存储库,集群可以从这里集中读写共享的信息,避免了每个节点的共享操作编程,减轻了分布式系统的开发难度。

    6. zookeeper的设计采用的是观察者的设计模式,zookeeper主要是负责存储和管理大家关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式。

    个人认为,zookeeper最有价值的东西也许是内容变化能够及时通知到监听者,结合上面zookeeper的特点,目前zookeeper的使用场景有:

    1.数据发布与订阅(配置中心)(适用于)

    2.负载均衡

    3.命名服务(Naming Service)(服务注册)

    4.分布式通知/协调

    5.集群管理与Master选举(hbase用到)

    6.分布式锁

    7.分布式队列

    但是zookeeper也有很多缺点:

    1. zookeeper不是为高可用性设计的

    2. zookeeper的选举过程速度很慢。网络实际上常常是会出现隔离等不完整状态的,而zookeeper对那种情况非常敏感。一旦出现网络隔离,zookeeper就要发起选举流程。zookeeper的选举流程通常耗时30到120秒,期间zookeeper由于没有master,都是不可用的。对于网络里面偶尔出现的,比如半秒一秒的网络隔离,zookeeper会由于选举过程,而把不可用时间放大几十倍

    3. zookeeper的性能是有限的

    4. zookeeper的权限控制非常薄弱

    zookeeper其实就是一个高可用的服务协调框架,不能作为存储,每次写入都必须集群n/2+1的集群写入完成之后才算完成,性能自己就可以想象了,而且其每个节点只能存储1M的数据,同时其节点目录也不宜过多。

    有兴趣的童鞋,可以参考深入研究zookeeper,充分利用起优点,开发出更多的应用场景来。

            作者:王文雅     唯品会java后端开发工程师

    相关文章

      网友评论

        本文标题:利用zookeeper实现缓存更新功能

        本文链接:https://www.haomeiwen.com/subject/fjwwgxtx.html