美文网首页
zookeeper实现分布式屏障

zookeeper实现分布式屏障

作者: sunpy | 来源:发表于2020-05-24 11:22 被阅读0次

1. 什么是分布式屏障

就是规定队列中元素的数量达到指定数量后,才可以打破屏障。类比多线程屏障CyclicBarrier理解。

2. 思路

(1)创建持久化根节点。
(2)创建临时顺序子节点,注册孩子节点变化的监听器,当孩子节点数量等于屏障parties数量就释放锁,破坏屏障。
(3)如果没有达到parties,就将当前线程进行等待。

3. 接口定义

public interface IZkBarrier {

    // 建立连接
    public void connect();
    // 创建根节点
    public void createRootNode();
    // 执行等待
    public void doAwait();
}

4. 实现

public class ZkBarrier implements IZkBarrier{

    // "IP:2181"
    private String zkAddr;
    // 4000
    private int timeout;
    // 持久化根节点路径 "/zk-barrier" 
    private String rootPath;
    private ZkClient client;
    
    // 控制线程是否执行的乐观锁 
    private CountDownLatch cdl = new CountDownLatch(1);
    // 屏障线程的数量
    private int parties;
    
    public ZkBarrier(String zkAddr, int timeout, String rootPath, int parties) {
        this.zkAddr = zkAddr;
        this.timeout = timeout;
        this.rootPath = rootPath;
        this.parties = parties;
    }
    
    /**
     * 建立连接
     */
    @Override
    public void connect() {
        client = new ZkClient(zkAddr, timeout);
    }
    
    /**
     * 创建持久根节点
     */
    @Override
    public void createRootNode() {
        if (client.exists(rootPath)) {
            return;
        } else {
            client.create(rootPath, new Integer(parties), CreateMode.PERSISTENT);
        }
    }
    
    /**
     * 执行等待
     */
    @Override
    public void doAwait() {
        client.create(rootPath + "/barrier_", new String(rootPath + "/barrier_"), CreateMode.EPHEMERAL_SEQUENTIAL);
        
        client.subscribeChildChanges(rootPath, new IZkChildListener() {
            
            @Override
            public void handleChildChange(String parentPath, List<String> childList) throws Exception {
                if (childList != null && childList.size() >= parties) {
                    cdl.countDown();
                }
            }
        });
        
        int count = client.getChildren(rootPath).size();
        
        try {
            if (count < parties) {
                cdl.await();
            } else {
                breakBarrier();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
    private void breakBarrier() {
        client.unsubscribeAll();
        client.deleteRecursive(rootPath);
        client.delete(rootPath);
        client.close();
    }

测试:

   public static void main(String[] args) {
        ZkBarrier zb = new ZkBarrier("ip:2181", 4000, "/zk-barrier", 3);
        zb.connect();
        zb.createRootNode();
        
        Thread t1 = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + "已到达");
                    zb.doAwait();
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }, "Thread1");
        
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + "已到达");
                    zb.doAwait();
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }, "Thread2");
        
        t1.start();
        t2.start();
        
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        Thread t3 = new Thread(new Runnable() {

            @Override
            public void run() {
                
                try {
                    System.out.println(Thread.currentThread().getName() + "已到达");
                    zb.doAwait();
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }, "Thread3");
        
        t3.start();
        
    }

结果:

Thread1已到达
Thread2已到达
Thread3已到达

问题:

  1. 为什么使用ZkClient客户端插件而不是原生态Zookeeper
    用原生态API实现花费很长时间,踩到了很多坑:
    ①监听事件只能执行一次,然后就失效了。
    ②监听事件不生效,排查也花了一段时间。
    ③原生态API中很多的异常处理,NoNode或者NodeExists等异常,这些异常处理很麻烦。
  2. 让线程的等待,我思考了很久,是采用死循环轮询,还是让其阻塞。最后我选择了阻塞。他们之间的区别还在思考。

相关文章

网友评论

      本文标题:zookeeper实现分布式屏障

      本文链接:https://www.haomeiwen.com/subject/bokkahtx.html