美文网首页
zookeeper day2

zookeeper day2

作者: 巴巴11 | 来源:发表于2020-05-18 22:14 被阅读0次

    1 zk 事件监听机制

    1.1 watcher概念

    zk 提供了消息发布/消息订阅功能。多个订阅者同时监听某一个主题对象。当该主题对象发生改变(节点内容改变,子节点列表改变等),会实时、主动通知所有订阅者。

    zk采用Watcher机制来实现发布订阅功能。
    该机制会在主题对象发生变化后异步通知客户端,因此客户端不必再订阅后轮询阻塞,从而减轻客户端压力。

    watcher机制与观察者模式类似。可用看作是观察者模式在分布式场景中的实现。

    1.2 watcher架构
    由三部分组成:

    • zk 服务端
    • zk 客户端
    • 客户端的ZKWatchManager对象

    1 客户端首先将watch注册到服务端,同时将watcher对象保存到客户端的watcher管理器中。
    2 当zk服务端监听的数据状态发生变化后,服务端会主动通知客户端。
    3 接着客户端的watcher管理器会触发相关的watcher来回调相应的处理逻
    辑。

    image.png image.png

    1.4 watcher的接口设计

    Watcher是一个接口,任何实现了接口的类就是一个新的watcher。watcher内部包含了两个枚举类: KeeperState、EventType

    image.png image.png image.png
    image.png image.png
    package watcher;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.util.concurrent.CountDownLatch;
    
    public class ZkConnectionWatcher implements Watcher {
        final static CountDownLatch latch = new CountDownLatch(1);
        static ZooKeeper zk;
    
        public static void main(String[] args) {
            try {
                zk = new ZooKeeper("localhost:2181", 5000, new ZkConnectionWatcher());
                // 阻塞等待连接创建
                latch.countDown();
                // 会话id
                System.out.println(zk.getSessionId());
    
                // 验证鉴权失败的事件监听
                // 添加授权用户
                zk.addAuthInfo("digest", "wh:12344".getBytes());
                byte[] res = zk.getData("/wh/node1", false, null);
                System.out.println(new String(res));
                Thread.sleep(10000);
                zk.close();
                System.out.println("all done...");
            } catch (Exception e) {
    
            }
        }
    
        public void process(WatchedEvent event) {
            try {
                // 监听连接事件类型
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("conncetioned ...");
                        latch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("connection duakai...");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("connection timeout");
                        zk = new ZooKeeper("localhost:2181", 5000, new ZkConnectionWatcher());
    
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    }
                }
            } catch (Exception e) {
    
            }
        }
    }
    
    
    image.png
    package set;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkSet1 {
        ZooKeeper zk;
    
        @Before
        public void before() throws IOException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                public void process(WatchedEvent event) {
                    if (event.getState().equals(Event.KeeperState.SyncConnected)) {
                        System.out.println("connectioned ...");
                        latch.countDown();
                    }
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
            latch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zk.close();
        }
    
        @Test
        public void watcherExist1() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg2 : 使用连接对象里的watcher对象
            zk.exists("/wh/node1", true);
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
        @Test
        public void watcherExist2() throws KeeperException, InterruptedException {
            // arg1 : node path
            // arg2 : 自定义的watcher对象
            zk.exists("/wh/node1", new Watcher() {
                public void process(WatchedEvent event) {
                    System.out.println("自定义的watcher...");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
        @Test
        public void watcherExist3() throws KeeperException, InterruptedException {
            // 验证watcher是一次性的
            Watcher watcher = new Watcher() {
                public void process(WatchedEvent event) {
                    System.out.println("自定义的watcher...");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            };
            // arg1 : node path
            // arg2 : 使用连接对象里的watcher对象
            zk.exists("/wh/node1", watcher);
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
        @Test
        public void watcherExist4() throws KeeperException, InterruptedException {
            // 验证watcher是一次性的
            // 改进,可用重复使用
            Watcher watcher = new Watcher() {
                public void process(WatchedEvent event) {
                    System.out.println("自定义的watcher...");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    try {
                        zk.exists("/wh/node1", this);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // arg1 : node path
            // arg2 : 使用连接对象里的watcher对象
            zk.exists("/wh/node1", watcher);
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
        @Test
        public void watcherExist5() throws KeeperException, InterruptedException {
            // 注册多个watcher
            zk.exists("/wh/node1", new Watcher() {
                public void process(WatchedEvent event) {
                    System.out.println("自定义的watcher1...");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
    
            zk.exists("/wh/node1", new Watcher() {
                public void process(WatchedEvent event) {
                    System.out.println("自定义的watcher2...");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
            Thread.sleep(10000);
            System.out.println("all done...");
        }
    
    }
    
    
    image.png
    类似1.6.1 exists
    
    image.png image.png
    类似1.6.1 exists
    
    image.png
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import watcher.ZkConnectionWatcher;
    
    import java.util.concurrent.CountDownLatch;
    
    public class ZkConfigCenter implements Watcher  {
    
        final static CountDownLatch latch = new CountDownLatch(1);
        static ZooKeeper zk;
    
        private String url;
        private String name;
        private String pwd;
    
    
    
        public void process(WatchedEvent event) {
            try {
                // 监听连接事件类型
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("conncetioned ...");
                        latch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("connection duakai...");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("connection timeout");
                        zk = new ZooKeeper("localhost:2181", 5000, new ZkConnectionWatcher());
    
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    } else if (event.getType() == Event.EventType.NodeDataChanged) {
                        initValue();
                    }
                }
            } catch (Exception e) {
    
            }
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            ZkConfigCenter configCenter = new ZkConfigCenter();
            for (int i=0;i<10;i++) {
                Thread.sleep(5000);
                System.out.println("url = " + configCenter.getUrl());
                System.out.println("name = " + configCenter.getName());
                System.out.println("pwd = " + configCenter.getPwd());
            }
        }
    
        public ZkConfigCenter() {
            initValue();
        }
    
        public void initValue() {
            try {
                zk = new ZooKeeper("localhost:2181", 5000, new ZkConnectionWatcher());
                String url = new String(zk.getData("/wh/config/url", true, null));
                String name = new String(zk.getData("/wh/config/name", true, null));
                String pwd = new String(zk.getData("/wh/config/pwd", true, null));
            } catch (Exception e) {
    
            }
        }
    
    
    
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getPwd() {
            return pwd;
        }
    
        public void setPwd(String pwd) {
            this.pwd = pwd;
        }
    }
    
    
    image.png
    import org.apache.zookeeper.*;
    import watcher.ZkConnectionWatcher;
    
    import java.util.concurrent.CountDownLatch;
    
    public class GlobalUniqueId implements Watcher {
        final static CountDownLatch latch = new CountDownLatch(1);
        static ZooKeeper zk;
    
        private String nodePath = "/uniqueid";
    
        public void process(WatchedEvent event) {
            try {
                // 监听连接事件类型
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("conncetioned ...");
                        latch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("connection duakai...");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("connection timeout");
                        zk = new ZooKeeper("localhost:2181", 5000, new ZkConnectionWatcher());
    
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    } else if (event.getType() == Event.EventType.NodeDataChanged) {
                    }
                }
            } catch (Exception e) {
    
            }
        }
    
    
        public GlobalUniqueId() {
            try {
                zk =  new ZooKeeper("localhost:2181", 5000, new ZkConnectionWatcher());
                latch.countDown();
            } catch (Exception ex) {
    
            }
        }
    
        public String getUniqueId() {
            String id = "";
            try {
                // 创建临时有序节点
                id = zk.create(nodePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
                // /uniqueid000000001
                id = id.substring(9);
            } catch (Exception ex) {
    
            }
            return id;
        }
    
        public static void main(String[] args) {
            GlobalUniqueId globalUniqueId = new GlobalUniqueId();
            for (int i=0;i<10;i++) {
                System.out.println(globalUniqueId.getUniqueId());
            }
        }
    
    }
    
    
    image.png
    import javafx.scene.SubScene;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 利用临时有序节点来实现分布式锁
     */
    public class ZkLock {
    
        // zk url
        String zkUrl = "localhost:2181";
    
        final CountDownLatch latch = new CountDownLatch(1);
        // zk 配置信息
        ZooKeeper zooKeeper;
        private static  final String LOCK_NODE_PATH = "/Locks";
        private static  final String LOCK_NODE_NAME = "/Lock_";
        private String lockPath;
    
        public ZkLock () {
            try {
                zooKeeper = new ZooKeeper(zkUrl, 5000, new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.None) {
                            System.out.println("connectioned ....");
                            latch.countDown();
                        }
                    }
                });
                latch.await();
            } catch (Exception e) {
    
            }
        }
    
        // 获取锁
        public void tryAccquireLock() throws KeeperException, InterruptedException {
            // 创建锁节点
            createLock();
            // 尝试获取锁
            accquireLock();
        }
    
        // 创建锁节点
        public void createLock() throws KeeperException, InterruptedException {
            // 1
            Stat stat = zooKeeper.exists(LOCK_NODE_PATH, false);
            if (stat == null) {
                zooKeeper.create(LOCK_NODE_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
            // 2 创建临时有序节点
            lockPath = zooKeeper.create(LOCK_NODE_PATH + "/" + LOCK_NODE_NAME,
                    new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
    
        // 监视上一个节点是否被删除
        Watcher watcher = new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDeleted) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            }
        };
    
    
         // 尝试获取锁
        public void accquireLock() throws InterruptedException, KeeperException {
            // 获取Locks下的所有子节点
            List<String> list = zooKeeper.getChildren(LOCK_NODE_PATH, false);
            Collections.sort(list);
            // Locks/Lock_000000001
            int index = list.indexOf(lockPath.substring(LOCK_NODE_PATH.length()+1));
            if(index == 0) {
                System.out.println("拿到锁");
            } else {
                // 上一个节点的路径
                String path = list.get(index -1);
                Stat stat = zooKeeper.exists(LOCK_NODE_PATH + "/" + path, watcher);
                if (stat == null) {
                    accquireLock();
                } else {
                    synchronized (watcher) {
                        watcher.wait();
                    }
                    accquireLock();
                }
            }
        }
    
        // shifang suo
        public void releaseLock() throws InterruptedException, KeeperException {
            zooKeeper.delete(this.lockPath, -1);
            zooKeeper.close();
            System.out.println("锁已经释放" + this.lockPath);
        }
    }
    
    
    
    
    import org.apache.zookeeper.KeeperException;
    import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
    
    /**
     * 测试分布式锁
     */
    public class TickSeller {
        private void sell() {
            System.out.println("sell begin...");
            // 线程随机休眠5000,模拟现实中的费时操作
            int mils = 5000;
            try {
                // 代表复杂逻辑执行
                Thread.sleep(mils);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("sell end...");
        }
    
        public void sellWithLock() throws KeeperException, InterruptedException {
            ZkLock lock= new ZkLock();
            // 获取锁
            lock.tryAccquireLock();
            sell();
            lock.releaseLock();
        }
    
        public static void main(String[] args) throws KeeperException, InterruptedException {
            TickSeller seller = new TickSeller();
            for (int i=0;i<10;i++) {
                seller.sellWithLock();
            }
        }
    }
    
    

    2 zk 集群搭建

    image.png image.png image.png

    3 一致性协议 Zab协议

    image.png
    image.png image.png image.png

    4 zk的leader选举

    image.png
    image.png
    image.png

    5 observer角色及其配置

    image.png

    6 Java API连接zk集群

    image.png
    image.png

    相关文章

      网友评论

          本文标题:zookeeper day2

          本文链接:https://www.haomeiwen.com/subject/isrrohtx.html