美文网首页
利用zookeeper实现分布式队列

利用zookeeper实现分布式队列

作者: 守住阳光 | 来源:发表于2018-08-30 16:30 被阅读0次

    一、zookeeper介绍

            zookeeper是源代码开放的分布式协调服务,由雅虎创建,是Google的开源实现。zookeeper是一个高性能的分布式数据一致性解决方案,它将那些复杂的、容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并提供一系列简单易用的接口给用户使用。

            其本身提供了一致性保证,特点如下:

            (1)顺序一致性:客户端的更新顺序与它们被发送的顺序一致。

            (2)原子性:更新操作要么成功,要么失败,没有第三种结果。

            (3)单系统镜像:无论客户端连接到哪一个服务器,他都将看到相同的zookeeper视图。

            (4)可靠性:一旦一个更新操作被应用,那么在客户端再次更新之前,其值不会再改变。

    二、应用场景

            zookeeper可以应用于很多的分布式服务场景,包括:集群管理,master选举,发布/订阅,分布式锁,分布式队列,分布式命名服务,服务注册于发现,负载均衡等等。下面一个例子介绍zookeeper如何实现分布式队列。

    三、zookeeper分布式队列实现

            zookeeper分布式队列的实现完成以下几个要素:

            (1)、数据入队,在一个节点下创建有序子节点,节点中设置需要入队的数据,完成数据的入队操作。

            (2)、数据出队,取出该节点下的所有子节点,如果数量不为0,取出一个子节点,并将子节点删除。

            (3)、提供判断是否有数据等的api。

            下面为具体代码实现

    1、DistributedSimpleQueue类

    public class DistributedSimpleQueue{

            protected final ZkClient zkClient;

            protected final String root;

            protected static final String Node_NAME = "n_";

            public DistributedSimpleQueue(ZkClient zkClient, String root) {

                    this.zkClient = zkClient;

                    this.root = root;

             }

            public int size() {

                return zkClient.getChildren(root).size();

            }

            public boolean isEmpty() {

                return zkClient.getChildren(root).size() == 0;

            } 

            public boolean offer(T element) throws Exception{ 

                    String nodeFullPath = root .concat( "/" ).concat( Node_NAME );

                     try { 

                             zkClient.createPersistentSequential(nodeFullPath , element); 

                     }catch (ZkNoNodeException e) { 

                             zkClient.createPersistent(root); 

                             offer(element); 

                     } catch (Exception e) { 

                         throw ExceptionUtil.convertToRuntimeException(e); 

                     } 

                     return true; 

                 }

                @SuppressWarnings("unchecked")

                public T poll() throws Exception {

                       try {

                               List  list = zkClient.getChildren(root);

                               if (list.size() == 0) {

                                    return null;

                                }

                                Collections.sort(list, new Comparator() {

                                        public int compare(String lhs, String rhs) {

                                                return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));

                                        }

                                });

                                for ( String nodeName : list ){

                                        String nodeFullPath = root.concat("/").concat(nodeName);

                                        try {

                                                T node = (T) zkClient.readData(nodeFullPath);

                                                zkClient.delete(nodeFullPath);

                                                return node;

                                           } catch (ZkNoNodeException e) {

                                                    // ignore

                                            }

                                }

                                return null;

                        } catch (Exception e) {

                                throw ExceptionUtil.convertToRuntimeException(e);

                        }

                }

                private String getNodeNumber(String str, String nodeName) {

                        int index = str.lastIndexOf(nodeName);

                        if (index >= 0) {

                            index += Node_NAME.length();

                            return index <= str.length() ? str.substring(index) : "";

                        }

                        return str;

                }

    }

    2、阻塞队列实现类

    public class DistributedBlockingQueue  extends DistributedSimpleQueue{ 

             public DistributedBlockingQueue(ZkClient zkClient, String root) { 

                     super(zkClient, root);

            } 

            @Override

            public T poll() throws Exception {

                    while (true){

                            final CountDownLatch latch = new CountDownLatch(1);

                            final IZkChildListener childListener = new IZkChildListener() {

                                    public void handleChildChange(String parentPath, List currentChilds) throws Exception {

                                            latch.countDown();

                                    }

                            };

                            zkClient.subscribeChildChanges(root, childListener);

                            try{

                                    T node = super.poll();

                                    if ( node != null ){

                                            return node;

                                    }else{

                                            latch.await();

                                    }

                            }finally{

                                    zkClient.unsubscribeChildChanges(root, childListener);

                            }

                    }

                }

    }

            阻塞队列的实现利用了CountDownLatch 的特性。当子节点数量为0时,即队列中没有元素,这是线程在此等待,同时监听子节点的变化,如果有数据入队,则从等待返回,取出数据。

    3、测试类

    public class TestDistributedBlockingQueue {

                public static void main(String[] args) {

                        ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);

                        int delayTime = 5;

                        ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());

                        final DistributedBlockingQueuequeue = new DistributedBlockingQueue(zkClient,"/Queue");

                        final User user1 = new User();

                        user1.setId("1");

                        user1.setName("xiao wang");

                        final User user2 = new User();

                        user2.setId("2");

                        user2.setName("xiao wang");

                        try {

                                delayExector.schedule(new Runnable() {

                                        public void run() {

                                             try {

                                                    queue.offer(user1);

                                                    queue.offer(user2);

                                                } catch (Exception e) {

                                                        e.printStackTrace();

                                                }

                                            }

                                   }, delayTime , TimeUnit.SECONDS);

                                   System.out.println("ready poll!");

                                    User u1 = (User) queue.poll();

                                    User u2 = (User) queue.poll();

                                    if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){

                                            System.out.println("Success!");

                                    }

                        } catch (Exception e) {

                                e.printStackTrace();

                        } finally{

                                    delayExector.shutdown();

                        try {

                                    delayExector.awaitTermination(2, TimeUnit.SECONDS);

                        } catch (InterruptedException e) {

                        }

                }

            }

    }

    相关文章

      网友评论

          本文标题:利用zookeeper实现分布式队列

          本文链接:https://www.haomeiwen.com/subject/vkcdwftx.html