美文网首页
5分钟学习zookeeper:Curator Recipes

5分钟学习zookeeper:Curator Recipes

作者: JerrysCode | 来源:发表于2021-02-15 14:04 被阅读0次

    Curator针对zookeeper的常见应用场景,提供了方便的Recipes(菜谱),帮助用户快速高效地完成代码开发。如:Leader选举,分布式锁,Barriers,分布式队列等。

    zookeeper 数据模型是一个类似文件系统的树形结构,每一个节点叫做znode。各种使用场景本质都是对znode的增删改查。

    Leader选举

    分布式系统有多个节点,刚开始所有节点通过leader选举选定其中一个节点为leader。如果leader节点宕机则重新选举新的leader。

    leader选举原理

    多个客户端创建同名的临时znode,只有一个客户端可以创建成功。创建znode成功的客户端获得锁。释放锁时删除znode,其他客户端监听到变化就可以重新竞争锁。
    Curator框架提供了LeaderLatch和LeaderSelector两种leader选举方式

    LeaderLatch

    接下来的代码模拟10个节点的leader选举情况。核心业务逻辑是:

    1. 创建LeaderLatch
    new LeaderLatch(client, "/master", "node" + i, LeaderLatch.CloseMode.NOTIFY_LEADER)
    
    1. 给每个LeaderLatch添加监听器LeaderLatchListener,当被选举为leader时触发isLeader函数回调,当失去leader角色后会触发notLeader函数回调
    for (int i = 0; i < 10; i++) {
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .retryPolicy(retryPolicy)
                        .connectionTimeoutMs(CONNECTION_TIMEOUT)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .build();
                client.start();
                LeaderLatch latch = new LeaderLatch(client, "/master", "node" + i, LeaderLatch.CloseMode.NOTIFY_LEADER);
                latch.addListener(new LeaderLatchListener() {
    
                    @Override   //被选举为leader时,回调调用此方法
                    public void isLeader() {
                        System.out.println(latch.getId() + " follower -> leader, hasLeadership: " + latch.hasLeadership());
    
                    }
    
                    /*如果失去Leader则回调此方法,必须设置LeaderLatch.CloseMode.NOTIFY_LEADER才会触发,否则不触发
                    失去Leader的场景:自身close退出,和zk server的连接断开
                     */
                    @Override
                    public void notLeader() {
                        System.out.println(latch.getId() + " lost leader: " + latch.hasLeadership());
    
                    }
                });
                latches.add(latch);
            }
    
    1. 通过latch.start()开始竞选
    for (LeaderLatch latch :latches){
                new Thread(() -> {
                    try {
                        latch.start();  //开始竞选
                        System.out.println("latch start.." + latch.getId());
    
                        latch.await();  //等待直到被选举为Leader
                        System.out.println("latch await.." + latch.getId());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    

    LeaderSelector

    接下来的代码模拟10个节点的leader选举。

    1. 创建LeaderSelector,创建时指定LeaderSelectorListener监听器。当被选举为leader或者leader节点发生变化时回调相应的方法。
    public LeaderSelector(CuratorFramework client,String mutexPath,LeaderSelectorListener listener)
    
    1. 通过 selector.start()开始leader竞选。
    List<LeaderSelector> selectors = new ArrayList<>();
            List<CuratorFramework> clients = new ArrayList<>();
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .retryPolicy(retryPolicy)
                        .connectionTimeoutMs(CONNECTION_TIMEOUT)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .build();
                client.start();
                clients.add(client);
            }
    
            for (CuratorFramework client : clients){
                LeaderSelector selector = new LeaderSelector(client, "/master", new LeaderSelectorListenerAdapter() {
    
                    @Override  //获取Leader权限后的处理逻辑,处理完成后自动释放Leader权限
                    public void takeLeadership(CuratorFramework client) throws Exception {
                        System.out.println(client.toString() + "  hasLeadership" );
                        Thread.sleep(2000);
                    }
    
                    @Override  //节点状态变化
                    public void stateChanged(CuratorFramework client, ConnectionState state) {
                        System.out.println(client.toString() + "  " + state.name());
                    }
                });
                selector.autoRequeue();  //释放Leader权限后,继续竞选
                selector.start();
                selectors.add(selector); 
            }
    

    分布式锁

    分布式锁保证多个节点之间的操作可以同步,即任意时间点只有一个节点持有锁。

    分布式锁实现原理

    1. zookeeper首先创建一个/lock节点
    2. 当有节点获取锁是,先为这个节点创建临时节点,例如lock-702564158761685-000001,序列号按创建顺序递增。
    3. zookeeper会检查 lock-702564158761685-000001 是否/lock下的最小节点,如果是该节点得到锁,否则监听 lock-702564158761685-000001 前一个节点状态
    4. 当前一个节点的状态发生变化时回到步骤1,继续竞争锁。


      image.png

    分布式锁代码实现

    1. 创建分布式锁
    CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .retryPolicy(retryPolicy)
                        .connectionTimeoutMs(CONNECTION_TIMEOUT)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .build();
    client.start();
    
    InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(client, "/lock");
    mutex = new InterProcessSemaphoreMutex(clnt, path) 
    
    1. 通过 mutex.acquire()获取锁
    mutex.acquire();
    
    1. 通过mutex.release()释放锁
    mutex.release();
    

    Curator Recipes封装了底层细节,让用户可以更快捷地实现各种分布式应用场景。除了本文列出的leader选举和分布式锁,还有分布式队列,节点缓存,Barrier等。可以参考官方文档http://curator.apache.org/curator-recipes/index.html上的api。

    相关文章

      网友评论

          本文标题:5分钟学习zookeeper:Curator Recipes

          本文链接:https://www.haomeiwen.com/subject/tcbotltx.html