美文网首页
zookeeper手把手教程(四)

zookeeper手把手教程(四)

作者: 黑白蓝调 | 来源:发表于2017-10-22 22:48 被阅读0次

    1. 分布式应用场景

    前面讲过分布式应用场景有:

    • 负载均衡
    • 分布式ID生成
    • 分布式锁
    • 分布式队列
    • 消息订阅发布
    • 命名服务
    • master选举

    负载均衡

    请求分摊

    master选举

    package com.frame.test.gp.zookeeperAPI.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.leader.LeaderSelector;
    import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
    import org.apache.curator.framework.state.ConnectionState;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/12 20:12
     */
    public class MasterSelector {
    
        private final static String MASTER_PATH = "/curator_master_path";
    
        public static void main(String[] args) {
            CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
            LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListener() {
                @Override
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    System.out.println("获得leader成功");
                    TimeUnit.SECONDS.sleep(2);
                }
    
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
    
                }
            });
            leaderSelector.autoRequeue();
            leaderSelector.start(); //开始选举
        }
    }
    
    

    消息订阅发布

    实现配置信息的集中式管理和数据的动态更新,典型场景disconf

    实现配置中心有两种模式:push 、pull。
    长轮训
    zookeeper采用的是推拉相结合的方式。 客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端
    发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据

    1. 数据量比较小
    2. 数据内容在运行时会发生动态变更
    3. 集群中的各个机器共享配置

    分布式ID生成

    利用zookeeper中的顺序节点的特性,制作分布式的序列号生成器(ID生成器),(在往数据库查询数据时,通常需要一个id,在单机环境下,可以利用数据库的自动成功id号,但是这种在分布式环境下就无法使用了,可以使用UUID,但是UUID有一个缺点,就是没有规律很难理解。使用zookeeper的命名服务可以生成有顺序的容易理解的,支持分布式的编号)

    package com.frame.test.gp.zookeeperAPI.GenId;
    
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/13 0:16
     * 分布式ID生成器,实际上是产生持久化有序节点,然后获取有序节点,作为ID
     */
    public class IdMark {
        private ZkClient zkClient;
    
        private final String server;  //记录服务器的地址
        private final String root;    //记录父节点的路径
        private final String nodeName;  //几点名称
        private volatile boolean runing = false;
        private ExecutorService cleanExector;
    
        public enum RemoveMethod {
            NONE,  //不
            IMMEDIATELY,  //立即
            DELAY   //延期
        }
    
        public IdMark(String server, String root, String nodeName) {
            this.server = server;
            this.root = root;
            this.nodeName = nodeName;
        }
    
        public void start() throws Exception {
            if (runing) {
                throw new Exception("server has started ...");
            }
            runing = true;
    
            init();
        }
    
        public void stop() throws Exception {
            if (!runing) {
                throw new Exception("server has stopped ...");
            }
            runing = false;
    
            freeResource();
        }
    
        /**
         * 初始化服务器资源
         */
        private void init() {
            zkClient = new ZkClient(server, 5000, 5000, new BytesPushThroughSerializer());
            cleanExector = Executors.newFixedThreadPool(10);
            try {
                zkClient.createPersistent(root, true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 释放资源
         */
        private void freeResource() {
            cleanExector.shutdown();
            try {
                cleanExector.awaitTermination(2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                cleanExector = null;
            }
            if (zkClient != null) {
                zkClient.close();
                zkClient = null;
            }
    
        }
    
        /**
         * 检查服务是否运行
         *
         * @throws Exception
         */
        public void checkRunning() throws Exception {
            if (!runing) {
                throw new Exception("请先调用start ");
            }
        }
    
        private String extractId(String str) {
            int index = str.lastIndexOf(nodeName);
            if (index >= 0) {
                index += nodeName.length();
                return index <= str.length() ? str.substring(index) : "";
            }
            return str;
        }
    
        /**
         * 获取ID
         *
         * @param removeMethod
         * @return
         */
        public String generateId(RemoveMethod removeMethod) throws Exception {
            checkRunning();
            final String fullNodePath = root.concat("/").concat(nodeName);
            //创建顺序节点每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。
            //基于这个特性,在创建子节点的时候,可以设置这个属性,呢么在创建节点过程,
            //Zookeeper会自动为给定节点名添加后缀,作为新节点名
            final String outPath = zkClient.createPersistentSequential(fullNodePath, null);
            if (removeMethod.equals(RemoveMethod.IMMEDIATELY)) {  //立即删除
                zkClient.deleteRecursive(outPath);
            } else if (removeMethod.equals(RemoveMethod.DELAY)) { //延期删除
                cleanExector.execute(new Runnable() {
                    @Override
                    public void run() {
                        zkClient.delete(outPath);
                    }
                });
            }
            return extractId(outPath);
        }
    
        public static void main(String[] args) throws Exception {
            IdMark idMaker=null;
            try {
                 idMaker = new IdMark("192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181",
                        "/NameService/IdGen", "ID-");
                 idMaker.start();
                for (int i = 0; i < 2; i++) {
                    String id = idMaker.generateId(RemoveMethod.DELAY);
                    System.out.println(id);
                }
            }finally {
                idMaker.stop();
            }
        }
    }
    
    

    分布式队列生成

    package com.frame.test.gp.zookeeperAPI.queue;
    
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/12 19:25
     */
    public class DistributedSimpleQueue<T> {
        private final ZkClient zkClient;
        private final String root;
        private static final String node_name = "n_";
    
        public DistributedSimpleQueue(ZkClient zkClient, String root) {
            this.zkClient = zkClient;
            this.root = root;
        }
    
        //获取队列的大小
        public int size() {
            //获取根节点下的所有子节点
            return zkClient.getChildren(root).size();
        }
    
        //判断队列是否为空
        public boolean isEmpty() {
            return size() == 0;
        }
    
        //存入队列
        public boolean offer(T element) {
            try {
                String nodeFullPath = root.concat("/").concat(node_name);
                zkClient.createPersistentSequential(nodeFullPath, element);
            } catch (ZkNoNodeException e) {
                zkClient.createPersistent(root);
                offer(element);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return true;
        }
    
        //从队列中取出元素
        public T poll() {
            try {
                List<String> list = zkClient.getChildren(root);
                if (list.size() == 0) {
                    return null;
                }
                //将队列由小到大的顺序排序
                Collections.sort(list, new Comparator<String>() {
                    public int compare(String lhs, String rhs) {
                        return getNodeNumber(lhs, node_name).compareTo(getNodeNumber(rhs, node_name));
                    }
                });
    
                /**
                 * 将队列中的元素做循环,然后构建完整的路径,在通过这个路径去读取数据
                 */
                for (String nodeName : list) {
                    String nodeFullPath = root.concat("/").concat(nodeName);
                    try {
                        T node = zkClient.readData(nodeFullPath);
                        zkClient.delete(nodeFullPath);
                        return node;
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public String getNodeNumber(String str, String nodeName) {
            int index = str.lastIndexOf(nodeName);
            if (index >= 0) {
                index += node_name.length();
                return index <= str.length() ? str.substring(index) : "";
            }
            return str;
        }
    }
    
    
    
    package com.frame.test.gp.zookeeperAPI.queue;
    
    import com.frame.entity.Admin;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.SerializableSerializer;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/12 22:33
     */
    public class QueueTest {
        private final static String CONNECTSTRING = "192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181";
    
        public static void main(String[] args) {
            ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000, 5000, new SerializableSerializer());
            zkClient.deleteRecursive("/queue");
    
            DistributedSimpleQueue<Admin> adminDistributedSimpleQueue = new DistributedSimpleQueue<>(zkClient, "/queue");
            for (int i = 0; i < 10; i++){
                Admin admin=new Admin();
                admin.setAdminName("队列"+i);
                admin.setAdminId((long) i);
                adminDistributedSimpleQueue.offer(admin);
                System.out.println("size: "+adminDistributedSimpleQueue.size());
            }
    
            Admin admin=adminDistributedSimpleQueue.poll();
            System.out.println(admin.getAdminName());
        }
    }
    
    

    分布式锁

    package com.frame.test.gp.zookeeperAPI.javaapilock;
    
    import com.frame.test.gp.thread.thread15.Lock;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.Random;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/10 23:10
     */
    public class DistributeLock {
        private static final String ROOT_LOCKS = "/LOCKS";  //根节点
    
        private ZooKeeper zooKeeper;
    
        private int sessionTimeOut; //会话超时时间
    
        private String lockID; //记录锁节点ID
    
        public static final byte[] data = {1, 2}; //节点数据
    
        public CountDownLatch countDownLatch = new CountDownLatch(1);
    
        public DistributeLock() throws IOException, InterruptedException {
            this.zooKeeper = ZKClientUtils.getInstance();
            this.sessionTimeOut = ZKClientUtils.getSessionTimeOut();
        }
    
        //获取锁的方法,有序节点最小的获得锁
        public boolean lock() {
            try {
                lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
                System.out.println(Thread.currentThread().getName() + "->成功创建了lock节点[" + lockID + "],开始竞争锁;");
    
                List<String> childreaNodes = zooKeeper.getChildren(ROOT_LOCKS, true); //获取根节点下的所有子节点
                //排序 从小到大
                SortedSet<String> sortedSet = new TreeSet<String>();
                for (String children : childreaNodes) {
                    sortedSet.add(ROOT_LOCKS + "/" + children);
                }
                String first = sortedSet.first();
                if (lockID.equalsIgnoreCase(first)) {
                    //表示当前就是最小的节点
                    System.out.println(Thread.currentThread().getName() + "->成功获得锁,lock节点为:[" + lockID + "]");
                    return true;
                }
                //获得锁的下一个节点 监控锁节点,当锁释放之后下一个节点获取锁
                SortedSet<String> lessThanLockId = sortedSet.headSet(lockID);  //handSet 返回从开始节点到指定元素的集合
                if (!lessThanLockId.isEmpty()) {
                    String prevLockID = lessThanLockId.last(); //拿到比当前LOCKID这个节点更小的上一个节点
                    zooKeeper.exists(prevLockID, new LockWatcher(countDownLatch));
                    countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);
                    //上面这段代码意味着如果会话超时或者节点被删除(释放)
                    System.out.println(Thread.currentThread().getName() + "成功获取锁:[" + lockID + "];下一个监控节点:"+prevLockID);
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return false;
        }
    
        public boolean unlock() {
            System.out.println(Thread.currentThread().getName() + "->开始释放锁:[" + lockID + "]");
            try {
                zooKeeper.delete(lockID, -1);
                System.out.println("节点[" + lockID + "]成功被删除");
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        public static void main(String[] args) {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            Random random = new Random();
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    DistributeLock lock = null;
                    try {
                        lock = new DistributeLock();
                        countDownLatch.countDown();
                        countDownLatch.await();
                        lock.lock();
                        Thread.sleep(random.nextInt(500));
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        if (lock != null) {
                            lock.unlock();
                        }
                    }
    
                }).start();
            }
        }
    
    }
    
    
    package com.frame.test.gp.zookeeperAPI.javaapilock;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/10 23:41
     */
    public class LockWatcher implements Watcher{
    
        private CountDownLatch countDownLatch;
    
        public LockWatcher(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(watchedEvent.getType()== Event.EventType.NodeDeleted){
                countDownLatch.countDown();
            }
        }
    }
    
    
    package com.frame.test.gp.zookeeperAPI.javaapilock;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author Administrator
     * @CREATE 2017/8/10 23:04
     */
    public class ZKClientUtils {
        private final static String CONNECTSTRING = "192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181";
    
        public static int getSessionTimeOut() {
            return sessionTimeOut;
        }
    
        public static final int sessionTimeOut = 5000;
    
        //获取链接
        public static ZooKeeper getInstance() throws IOException, InterruptedException {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState()== Event.KeeperState.SyncConnected){
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            return zooKeeper;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:zookeeper手把手教程(四)

          本文链接:https://www.haomeiwen.com/subject/paonuxtx.html