最近在用了堆外缓存之后,由于存在着缓存不更新的风险,有时候需要对缓存进行处理。该域用的是公司开发服务平台框架,有点类似Tomcat运用。
费劲脑汁,暂时想到两种方案:
-
开发一个专门用于运维的接口,每次需要运维时,指定ip进行(若不指定ip,由于有多台部署机器,请求路由到完全不可知的机器上)
-
采用公司分布式配置依赖最多的,功能也相对强大的zookeeper框架。运用zk节点的内容变化时的及时通知机制。
两种方案的比较
优点 | 缺点 | |
---|---|---|
方案一 | 实施起来简单 | 没有挑战性,需要知道所有ip,其次ip改变就比较难维护了 |
方案二 | 修改节点值,客户端能及时监听到,从而能做出相应的操作,比如在本例中可以进行缓存的更新,而不需要知道客户端的ip | 需要有操作zk的操作权限 |
经过上面的比较,决定采用方法二(假定已申请到zk的操作权限)
动手前准备
1.下载zookeeper-3.3.6,解压后看到
image2. 进入conf目录下,新建一个zoo.cfg文件,并写入
# The number of milliseconds of each tick 心跳间隔 毫秒每次
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting anacknowledgement
syncLimit=5
# the directory where the snapshot isstored. //镜像数据位置
dataDir=E:\\develop-tools\\zookeeper-3.3.6\\data
#日志位置
dataLogDir=E:\\develop-tools\\zookeeper-3.3.6\\logs
# the port at which the clients willconnect 客户端连接的端口
clientPort=2181
server.0=127.0.0.1:8880:7770
#server.1=127.0.0.1:8881:7771
#server.2=127.0.0.1:8882:7772
3. 双击bin目录下的 zkServer.cmd,即可在Windows环境下启动一个zk服务端。(zkCli的用法,大家可以自行学习)
开始动手
为了代码优雅,由于ZooKeeper原生客户端的各类操作方法比较繁琐,用户体验不好,下面我们用Curator+Spring实现demo:
1.封装获取zooKeeper客户端连接的工厂类:ZookeeperFactory
package com.vip.fcs.ps.zk.demo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.StringUtils;
/**
* 获取zookeeper客户端链接工厂类
*/
public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
private String zkHosts;
// session超时
private int sessionTimeout = 30000;
private int connectionTimeout = 30000;
// 共享一个zk链接
private boolean singleton = true;
// 全局path前缀,常用来区分不同的应用
private String namespace;
private final static String ROOT = "vip";
private CuratorFramework zkClient;
public void setZkHosts(String zkHosts) {
this.zkHosts = zkHosts;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public void setSingleton(boolean singleton) {
this.singleton = singleton;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
@Override
public CuratorFramework getObject() throws Exception {
if (singleton) {
if (zkClient == null) {
zkClient = create();
zkClient.start();
}
return zkClient;
}
return create();
}
@Override
public Class<?> getObjectType() {
return CuratorFramework.class;
}
@Override
public boolean isSingleton() {
return singleton;
}
public CuratorFramework create() throws Exception {
if (StringUtils.isEmpty(namespace)) {
namespace = ROOT;
} else {
namespace = ROOT + "/" + namespace;
}
return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
}
public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout,
String namespace) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
.canBeReadOnly(true).namespace(namespace)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)).defaultData(null).build();
}
public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}
2.zookeeper的操作类:ZkHandler
package com.vip.fcs.ps.zk.demo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.InitializingBean;
/**
* 结合spring,封装了zk的相关操作
*/
public class ZkHandler implements InitializingBean {
private CuratorFramework zkClient;
private String service = "ps";
private String version = "1.0";
private TreeCache treeCache;
public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
@Override
public void afterPropertiesSet() throws Exception {
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
buildTreeCache(zkClient, getServicePath());
// 开始监听
treeCache.start();
}
/**
* 监听的节点
* @return
*/
private String getServicePath() {
return "/" + service + "/" + version;
}
private void buildTreeCache(final CuratorFramework zkClient, String path) {
// 设置节点的cache
treeCache = new TreeCache(zkClient, path);
// 设置监听器和处理过程
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
String log = ("NODE_ADDED : " + data.getPath() + " --->DATA:" + new String(data.getData()));
if (data != null) {
switch (event.getType()) {
case NODE_ADDED:
System.out.println(log);
// do sth
break;
case NODE_REMOVED:
System.out.println(log);
// do sth
break;
case NODE_UPDATED:
System.out.println(log);
// do sth
break;
default:
break;
}
} else {
System.out.println("data is NULL" + event.getType());
}
}
});
}
public void createPersistentNode() throws Exception {
if (zkClient.checkExists().forPath(getServicePath()) == null) {
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(getServicePath());
}
}
public void changeNodeData(String nodeData) throws Exception {
if (zkClient.checkExists().forPath(getServicePath()) != null) {
zkClient.setData().forPath(getServicePath(), nodeData.getBytes());
}
}
}
- 下面我们再创建3个客户端(2个watcher+一个用来改变节点值的client)
3.1 ZkWatcherOne
package com.vip.fcs.ps.zk.demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ZkWatcherOne {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
while (true) {
Thread.sleep(2000L);
}
}
}
3.2 ZkWatcherTwo
package com.vip.fcs.ps.zk.demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ZkWatcherTwo {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
while (true) {
Thread.sleep(2000L);
}
}
}
3.3 ZkSetClient
package com.vip.fcs.ps.zk.demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ZkSetClient {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
zkHandler.createPersistentNode();
for (int i = 0; i < 10; i++) {
zkHandler.changeNodeData("time" + i);
Thread.sleep(2000L);
}
}
}
4.开始运行相关代码
4.1 启动ZkWatcherOne, 控制台打印日志
NODE_ADDED : /ps/1.0 --->DATA:
4.2 启动ZkWatcherTwo, 控制台打印日志
NODE_ADDED : /ps/1.0 --->DATA:
4.3 这时候启动ZkSetClient,在ZkWatcherOne和ZkWatcherTwo的控制台最终打印出如下的日志
image由此可见,当一个节点值改变时,其他节点都能接收到监听事件,并能够做出相应的操作。在监听事件中,我们可以对本机缓存进行更新,由此达到了我们的目的。
4.4 相关配置文件 applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"
default-lazy-init="false">
<!-- zookeeper -->
<bean id="zookeeper" class="com.vip.fcs.ps.zk.demo.ZookeeperFactory"
destroy-method="close">
<property name="zkHosts"
value="127.0.0.1:2181"/>
<property name="namespace" value="ps.api"/>
<property name="connectionTimeout" value="3000"/>
<property name="sessionTimeout" value="3000"/>
<property name="singleton" value="true"/>
</bean>
<!-- zookeeper -->
<bean id="zkHandler" class="com.vip.fcs.ps.zk.demo.ZkHandler">
<property name="zkClient"
ref="zookeeper"/>
</bean>
</beans>
4.4 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fcs.ps.demo</groupId>
<artifactId>zk-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spring.version>4.0.7.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
</project>
拓展 :zookeeper如此强大,我们进一步了解一下其优缺点,已经相关的应用场景。
zookeeper 特点及优点 :
1. zookeeper是一个精简的文件系统。这点它和hadoop有点像,但是zookeeper这个文件系统是管理小文件的,而hadoop是管理超大文件的。
2. zookeeper提供了丰富的“构件”,这些构件可以实现很多协调数据结构和协议的操作。例如:分布式队列、分布式锁以及一组同级节点的“领导者选举”算法。
3. zookeeper是高可用的,它本身的稳定性是相当之好,分布式集群完全可以依赖zookeeper集群的管理,利用zookeeper避免分布式系统的单点故障的问题。
4. zookeeper采用了松耦合的交互模式。这点在zookeeper提供分布式锁上表现最为明显,zookeeper可以被用作一个约会机制,让参入的进程不在了解其他进程的(或网络)的情况下能够彼此发现并进行交互,参入的各方甚至不必同时存在,只要在zookeeper留下一条消息,在该进程结束后,另外一个进程还可以读取这条信息,从而解耦了各个节点之间的关系。
5. zookeeper为集群提供了一个共享存储库,集群可以从这里集中读写共享的信息,避免了每个节点的共享操作编程,减轻了分布式系统的开发难度。
6. zookeeper的设计采用的是观察者的设计模式,zookeeper主要是负责存储和管理大家关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式。
个人认为,zookeeper最有价值的东西也许是内容变化能够及时通知到监听者,结合上面zookeeper的特点,目前zookeeper的使用场景有:
1.数据发布与订阅(配置中心)(适用于)
2.负载均衡
3.命名服务(Naming Service)(服务注册)
4.分布式通知/协调
5.集群管理与Master选举(hbase用到)
6.分布式锁
7.分布式队列
但是zookeeper也有很多缺点:
-
zookeeper不是为高可用性设计的
-
zookeeper的选举过程速度很慢。网络实际上常常是会出现隔离等不完整状态的,而zookeeper对那种情况非常敏感。一旦出现网络隔离,zookeeper就要发起选举流程。zookeeper的选举流程通常耗时30到120秒,期间zookeeper由于没有master,都是不可用的。对于网络里面偶尔出现的,比如半秒一秒的网络隔离,zookeeper会由于选举过程,而把不可用时间放大几十倍
-
zookeeper的性能是有限的
-
zookeeper的权限控制非常薄弱
zookeeper其实就是一个高可用的服务协调框架,不能作为存储,每次写入都必须集群n/2+1的集群写入完成之后才算完成,性能自己就可以想象了,而且其每个节点只能存储1M的数据,同时其节点目录也不宜过多。
有兴趣的童鞋,可以参考深入研究zookeeper,充分利用起优点,开发出更多的应用场景来。
作者:王文雅 唯品会java后端开发工程师
网友评论