Springboot整合curator访问zookeeper
POM
<!-- zookeeper curator客户端 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
配置文件application.yml
#============== zookeeper ===================
zookeeper:
server: 192.168.8.105:2181
sessionTimeoutMs: 6000
connectionTimeoutMs: 6000
maxRetries: 3
baseSleepTimeMs: 1000
测试代码
@Service
public class CuratorTestServiceImpl implements CuratorTestService {
@Value("${zookeeper.server}")
private String zookeeperServer;
@Value(("${zookeeper.sessionTimeoutMs}"))
private int sessionTimeoutMs;
@Value("${zookeeper.connectionTimeoutMs}")
private int connectionTimeoutMs;
@Value("${zookeeper.maxRetries}")
private int maxRetries;
@Value("${zookeeper.baseSleepTimeMs}")
private int baseSleepTimeMs;
@Override
public void testCurator() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zookeeperServer)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)).build();
try {
/**
* 创建会话
*/
client.start();
/**
* 创建节点
* 注意:
* 1 除非指明创建节点的类型,默认是持久节点
* 2 ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
*/
client.create().forPath("/China");//创建一个初始内容为空的节点
client.create().forPath("/America", "zhangsan".getBytes());
client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点
/**
* 异步创建节点
* 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理
*/
client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
+ ",type:" + event.getType());
}
}, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
+ ",type:" + event.getType());
}
}).forPath("/async-curator-zookeeper");
/**
* 获取节点内容
*/
byte[] data = client.getData().forPath("/America");
System.out.println(new String(data));
byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
System.out.println(new String(data2));
/**
* 更新数据
*/
Stat stat = client.setData().forPath("/America");
//client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
client.setData().forPath("/America", "lisi".getBytes());
/**
* 删除节点
*/
client.delete().forPath("/China");//只能删除叶子节点
client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
//client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
client.delete().forPath("/America");
//client.delete().guaranteed().forPath("/America");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
} catch (Exception e) {
e.printStackTrace();
}
}
}
Curator framework实例
使用CuratorFrameworkFactory产生framework实例。 CuratorFrameworkFactory 既提供了factory方法也提供了builder来创建实例。CuratorFrameworkFactory是线程安全的。你应该在应用中为单一的ZooKeeper集群共享唯一的CuratorFramework实例
。
工厂方法(newClient())提供了一个简单的方式创建实例。Builder可以使用更多的参数控制生成的实例。一旦生成framework实例, 必须调用start方法启动它。应用结束时应该调用close方法关闭它。
Curator framework原理: 单一长链接
zookeeper分布式锁
POM
<!-- curator分布式锁 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
配置管理类CuratorConfiguration.java
/**
* zk连接配置管理
* @author Suoron
*/
@Component
public class CuratorConfiguration {
@Value("${zookeeper.server}")
private String zookeeperServer;
@Value(("${zookeeper.sessionTimeoutMs}"))
private int sessionTimeoutMs;
@Value("${zookeeper.connectionTimeoutMs}")
private int connectionTimeoutMs;
@Value("${zookeeper.maxRetries}")
private int maxRetries;
@Value("${zookeeper.baseSleepTimeMs}")
private int baseSleepTimeMs;
@Bean
public CuratorFramework createZkClient(){
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zookeeperServer)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)).build();
/**
* 创建会话
*/
client.start();
return client;
}
}
分布式锁测试Service
/**
* 火车票抢票模拟
* @author Suoron
*/
@Service
public class CuratorLockServiceImpl implements CuratorLockService {
@Resource
CuratorFramework zkclient;
@Override
public void testDistribute() {
//创建分布式锁
final InterProcessMutex lock = new InterProcessMutex(zkclient, "/locktest");
//模拟多线程,可以从多个应用(tomcat)发起请求
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
private int id;
public Runnable setArgs(int x){
id = x;
return this;
}
@Override
public void run() {
try {
//加锁
lock.acquire();
//lock.acquire(5, TimeUnit.SECONDS) == true;
//-------------业务处理开始
System.out.println(System.currentTimeMillis()+":当前抢票用户:"+id);
//TODO 数据库操作
//-------------业务处理结束
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释放
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.setArgs(i),"t" + i).start();
}
try {
Thread.sleep(100*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
zookeeper 栅栏
应用场景:
马场赛马,每匹马相当于一个应用程序,必须等全部数量的马准备就绪才能开始赛马
赛马封装HorseRace.java
/**
*
* 赛马算法
* 多个进程同一时间开始
* @author Suoron
* 2018/09/27
*
*/
public interface HorseRace {
/* 创建赛马 */
public void createRace(String rootPath) throws Exception ;
/**
* 进入比赛
* @param rootPath 比赛根路径
* @param name 马匹名称
* @param size 本次赛马数
* @return boolean 返回是否准备就绪状态
*
*/
public boolean enterRace(String rootPath,String name,int size);
/* 退出比赛 */
public boolean exitRace(String rootPath,String name);
/* 退出比赛(等待所有人一起退出) */
public boolean togetherExitRace(String rootPath,String name);
}
赛马算法实现类HorseRaceImpl.java
public class HorseRaceImpl implements HorseRace {
@Resource
CuratorFramework zkClient;
@Override
public boolean togetherExitRace(String rootPath,String name){
//先删除自己的节点
ZkClientHelper.deleteNode(zkClient,rootPath+"/"+name,0);
//此时要等待所有的线程都删除了节点,即所有线程都做完了该做的事情,才结束线程。确保所有的线程同时结束。
while(true){
int size = ZkClientHelper.getChildrenSize(zkClient, rootPath);
if(size != 0) {
System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
return true;
}
}
}
@Override
public boolean enterRace(String rootPath,String name,int size) {
try{
this.createRace(rootPath);
}catch(Exception ex){
ex.printStackTrace();
return false;
}
String node = ZkClientHelper.createTempNode(zkClient, rootPath+"/"+name, "1");
while(true) {
int n = ZkClientHelper.getChildrenSize(zkClient, rootPath);
if (n != size) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
return true;
}
}
}
@Override
public void createRace(String rootPath) throws Exception {
if(!ZkClientHelper.checkExists(zkClient,rootPath)){
String rs = ZkClientHelper.creatingParentsNode(zkClient, rootPath, "1");
//System.out.println(rs);
}
}
@Override
public boolean exitRace(String rootPath, String name) {
//先删除自己的节点
ZkClientHelper.deleteNode(zkClient,rootPath+"/"+name,0);
return true;
}
}
工具类ZkClientHelper
public class ZkClientHelper {
public static boolean deleteNode(CuratorFramework zkClient,String path,int version) {
try {
zkClient.delete().forPath(path);
//System.out.println("delete path:"+ path + "success");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static int getChildrenSize(CuratorFramework zkClient,String path) {
try {
return zkClient.getChildren().watched().forPath(path).size();
} catch (KeeperException e) {
e.printStackTrace();
return -1;
} catch (Exception e) {
e.printStackTrace();
return -1;
}
}
public static String createTempNode(CuratorFramework zkClient,String path,String data) {
try {
String node = zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path,data.getBytes());
//System.out.println(rs);
//System.out.println("node "+ path +" with data is created,return node "+node);
return node;
} catch (KeeperException e) {
e.printStackTrace();
return "ERROR";
} catch (Exception e) {
e.printStackTrace();
return "ERROR";
}
}
public static String creatingParentsNode(CuratorFramework zkClient, String path, String data){
try {
String node = zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path,data.getBytes());
//System.out.println(rs);
return node;
} catch (Exception e) {
e.printStackTrace();
return "ERROR";
}
}
public static boolean checkExists(CuratorFramework zkClient,String path){
try {
Stat stat = zkClient.checkExists().forPath(path);
if(stat!=null) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class HorseRaceTest {
@Resource
HorseRace horseRace;
class MyThread extends Thread{
private int horseId;
private MyThread(int horseId){
this.horseId = horseId;
}
public void run(){
//先业务处理,后准备赛马
System.out.println("do something before!!!"+horseId);
horseRace.enterRace("/hotel/download", "MyHorse"+horseId, 6);
horseRace.exitRace("/hotel/download", "MyHorse"+horseId);
}
}
class MyThread1 extends Thread{
private int horseId;
private MyThread1(int horseId){
this.horseId = horseId;
}
public void run(){
horseRace.enterRace("/hotel/download", "MyHorse"+horseId, 6);
horseRace.exitRace("/hotel/download", "MyHorse"+horseId);
//先赛马,后业务处理
System.out.println("do something after!!!"+horseId);
}
}
@Test
public void testRace(){
MyThread mythread1 = new MyThread(1); //1号赛马
MyThread mythread2 = new MyThread(2); //2号赛马
MyThread mythread3 = new MyThread(3); //3号赛马
MyThread mythread4 = new MyThread(4); //4号赛马
MyThread mythread5 = new MyThread(5); //5号赛马
MyThread1 mythread6 = new MyThread1(6); //特殊的6号赛马
mythread6.start();
mythread5.start();
mythread4.start();
mythread3.start();
mythread2.start();
mythread1.start();
try{
Thread.sleep(10*1000);
}catch(Exception ex){
ex.printStackTrace();
}
}
}
网友评论