美文网首页大家一起来学Zookeeper机器学习我是程序员
【从入门到放弃-ZooKeeper】ZooKeeper实战-分布

【从入门到放弃-ZooKeeper】ZooKeeper实战-分布

作者: 阿里云云栖号 | 来源:发表于2019-09-18 15:42 被阅读0次

    前言

    上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
    接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

    设计

    我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
    主要重写offer、poll、peek、size方法。
    我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
    offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
    poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
    peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
    size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

    DistributedQueue

    //继承AbstractQueue类并实现Queue接口
    public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {
        private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);
    
        //ZooKeeper客户端,进行ZooKeeper操作
        private ZooKeeper zooKeeper;
    
        //根节点名称
        private String dir;
    
        //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
        private String node;
    
        //ZooKeeper鉴权信息
        private List<ACL> acls;
    
        /**
         * Constructor.
         *
         * @param zooKeeper the zoo keeper
         * @param dir       the dir
         * @param node      the node
         * @param acls      the acls
         */
       public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
            this.zooKeeper = zooKeeper;
            this.dir = dir;
            this.node = node;
            this.acls = acls;
            init();
        }
    
        private void init() {
            //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。
            try {
                Stat stat = zooKeeper.exists(dir, false);
                if (stat == null) {
                    zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
                }
            } catch (Exception e) {
                logger.error("[DistributedQueue#init] error : " + e.toString(), e);
            }
        }
    }
    

    offer

    /**
     * Offer boolean.
     *
     * @param o the o
     * @return the boolean
     */
    @Override
    public boolean offer(E o) {
        //构建要插入的节点名称
        String fullPath = dir.concat("/").concat(node);
        try {
            //创建子节点成功则返回入队成功
          zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
            return true;
        } catch (Exception e) {
            logger.error("[DistributedQueue#offer] error : " + e.toString(), e);
        }
        return false;
    }
    

    poll

    /**
     * Poll e.
     *
     * @return the e
     */
    @Override
    public E poll() {
        try {
            //获取根节点所有子节点信息。
            List<String> children = zooKeeper.getChildren(dir, null);
            //如果队列是空的则返回null
            if (children == null || children.isEmpty()) {
                return null;
            }
    
            //将子节点名称排序
            Collections.sort(children);
            for (String child : children) {
                //拼接子节点的具体名称
                String fullPath = dir.concat("/").concat(child);
                try {
                    //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
                    byte[] bytes = zooKeeper.getData(fullPath, false, null);
                    E data = (E) bytesToObject(bytes);
                    zooKeeper.delete(fullPath, -1);
                    return data;
                } catch (Exception e) {
                    logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);
                }
            }
    
        } catch (Exception e) {
            logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);
        }
    
        return null;
    }
    

    peek

    /**
     * Peek e.
     *
     * @return the e
     */
    @Override
    public E peek() {
    
        try {
            //获取根节点所有子节点信息。
            List<String> children = zooKeeper.getChildren(dir, null);
            //如果队列是空的则返回null
            if (children == null || children.isEmpty()) {
                return null;
            }
    
            //将子节点名称排序
            Collections.sort(children);
    
            for (String child : children) {
                //拼接子节点的具体名称
                String fullPath = dir.concat("/").concat(child);
                try {
                    //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
                    byte[] bytes = zooKeeper.getData(fullPath, false, null);
                    E data = (E) bytesToObject(bytes);
                    return data;
                } catch (Exception e) {
                    logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);
                }
            }
    
        } catch (Exception e) {
            logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);
        }
    
        return null;
    }
    

    size

    /**
     * Size int.
     *
     * @return the int
     */
    @Override
    public int size() {
        try {
            //获取根节点的子节点名称
            List<String> children = zooKeeper.getChildren(dir, null);
            //返回子结点信息数量
            return children.size();
        } catch (Exception e) {
            logger.error("[DistributedQueue#offer] size : " + e.toString(), e);
        }
    
        return 0;
    }
    

    总结

    上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr。如果有好的优化建议,欢迎一起讨论。



    本文作者:aloof_

    阅读原文

    本文为云栖社区原创内容,未经允许不得转载。

    相关文章

      网友评论

        本文标题:【从入门到放弃-ZooKeeper】ZooKeeper实战-分布

        本文链接:https://www.haomeiwen.com/subject/veahuctx.html