1. 什么是分布式屏障
就是规定队列中元素的数量达到指定数量后,才可以打破屏障。类比多线程屏障CyclicBarrier理解。
2. 思路
(1)创建持久化根节点。
(2)创建临时顺序子节点,注册孩子节点变化的监听器,当孩子节点数量等于屏障parties数量就释放锁,破坏屏障。
(3)如果没有达到parties,就将当前线程进行等待。
3. 接口定义
public interface IZkBarrier {
// 建立连接
public void connect();
// 创建根节点
public void createRootNode();
// 执行等待
public void doAwait();
}
4. 实现
public class ZkBarrier implements IZkBarrier{
// "IP:2181"
private String zkAddr;
// 4000
private int timeout;
// 持久化根节点路径 "/zk-barrier"
private String rootPath;
private ZkClient client;
// 控制线程是否执行的乐观锁
private CountDownLatch cdl = new CountDownLatch(1);
// 屏障线程的数量
private int parties;
public ZkBarrier(String zkAddr, int timeout, String rootPath, int parties) {
this.zkAddr = zkAddr;
this.timeout = timeout;
this.rootPath = rootPath;
this.parties = parties;
}
/**
* 建立连接
*/
@Override
public void connect() {
client = new ZkClient(zkAddr, timeout);
}
/**
* 创建持久根节点
*/
@Override
public void createRootNode() {
if (client.exists(rootPath)) {
return;
} else {
client.create(rootPath, new Integer(parties), CreateMode.PERSISTENT);
}
}
/**
* 执行等待
*/
@Override
public void doAwait() {
client.create(rootPath + "/barrier_", new String(rootPath + "/barrier_"), CreateMode.EPHEMERAL_SEQUENTIAL);
client.subscribeChildChanges(rootPath, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> childList) throws Exception {
if (childList != null && childList.size() >= parties) {
cdl.countDown();
}
}
});
int count = client.getChildren(rootPath).size();
try {
if (count < parties) {
cdl.await();
} else {
breakBarrier();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void breakBarrier() {
client.unsubscribeAll();
client.deleteRecursive(rootPath);
client.delete(rootPath);
client.close();
}
测试:
public static void main(String[] args) {
ZkBarrier zb = new ZkBarrier("ip:2181", 4000, "/zk-barrier", 3);
zb.connect();
zb.createRootNode();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "已到达");
zb.doAwait();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "已到达");
zb.doAwait();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread2");
t1.start();
t2.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "已到达");
zb.doAwait();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread3");
t3.start();
}
结果:
Thread1已到达
Thread2已到达
Thread3已到达
问题:
- 为什么使用ZkClient客户端插件而不是原生态Zookeeper
用原生态API实现花费很长时间,踩到了很多坑:
①监听事件只能执行一次,然后就失效了。
②监听事件不生效,排查也花了一段时间。
③原生态API中很多的异常处理,NoNode或者NodeExists等异常,这些异常处理很麻烦。 - 让线程的等待,我思考了很久,是采用死循环轮询,还是让其阻塞。最后我选择了阻塞。他们之间的区别还在思考。
网友评论