美文网首页
zookeeper Master选举

zookeeper Master选举

作者: 邝健强 | 来源:发表于2018-09-04 21:14 被阅读0次

简述

在分布式环境中,常常需要遇到这样一个场景:对于一个复杂的任务,需要从集群中选举一台机器进行处理即可。这种分布式问题就是“Master选举”。

思路

利用zookeeper的特性很容易实现Master选举功能,思路如下:

选择一个根节点,例如:/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用zookeeper特性,最终只有一个机器能够创建成功,成功的机器就是Master。
master选举还可以利用数据库的主键唯一性进行,这里不在细说。

Curator实现

Curator是对zookeeper的封装,可以调用更简单的api实现选举。Curator提供两种Leader选举策略:

  1. Leader Latch
    随机选取一台机器作为master,除非显示调用close方法释放leadership,否则其他机器无法成为master。这种方案适合主备应用,在主应用宕机后,从剩下的备用应用选出一个成为新的主应用。
  2. Leader Election
    这种选举策略跟Leader Latch选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。

Leader Latch

package com.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class LeaderLatchDemo {

    private static final String PATH = "/demo/leader";


    public static void main(String[] args)throws  Exception {

        String SERVER ="127.0.0.1:2181";

        List<LeaderLatch> latchList = new ArrayList<LeaderLatch>();
        List<CuratorFramework> clients = new ArrayList<CuratorFramework>();

        try {

            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient(SERVER);
                final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);

                leaderLatch.addListener(new LeaderLatchListener() {
                    public void isLeader() {
                        System.out.println("I am leader:"+leaderLatch.getId());
                    }

                    public void notLeader() {

                    }
                });
                latchList.add(leaderLatch);
                leaderLatch.start();
            }
            Thread.sleep(5000);
            LeaderLatch leader = null;


            for(LeaderLatch leaderLatch : latchList){
                if(leaderLatch.hasLeadership()){
                    leader = leaderLatch;
                    break;
                }
            }

            System.out.println("当前leader是: " + leader.getId());
            leader.close();
            latchList.remove(leader);

            LeaderLatch firstNode = latchList.get(0); //获取此时第一个节点
            System.out.println("删除leader后,当前第一个节点: " + firstNode.getId());

            firstNode.await(10, TimeUnit.SECONDS); //阻塞并尝试获取领导权,可能失败

            //再次获取当前leader
            for(LeaderLatch tmp : latchList){
                if(tmp.hasLeadership()){
                    leader = tmp;
                    break;
                }
            }

            System.out.println("最终实际leader是: " + leader.getId());

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for(CuratorFramework client : clients){
                CloseableUtils.closeQuietly(client);
            }

            for(LeaderLatch leaderLatch : latchList){
                CloseableUtils.closeQuietly(leaderLatch);
            }
        }
    }

    private static CuratorFramework getClient(String connectString) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

输出:

client#6:I am leader. I am doing jobs!
当前leader是: client#6
删除leader后,当前第一个节点: client#0
client#9:I am leader. I am doing jobs!
最终实际leader是: client#9

Leader Election


public class CustomLeaderSelectorListenerAdapter  extends
        LeaderSelectorListenerAdapter implements Closeable {

    private String name;
    private LeaderSelector leaderSelector;
    public AtomicInteger leaderCount = new AtomicInteger();

    public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name
    ) {
        this.name = name;
        this.leaderSelector = new LeaderSelector(client, path, this);

        /**
         * 自动重新排队
         * 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
         */
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    /**
     * 获取领导权
     */
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = 2;
        System.out.println(name + "成为当前leader");
        System.out.println(name + " 之前成为leader的次数:" + leaderCount.getAndIncrement() + "次");

        //TODO 其他业务代码
        try{
            //等待2秒后放弃领导权(模拟业务执行过程)
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        }catch ( InterruptedException e ){
            System.err.println(name + "已被中断");
            Thread.currentThread().interrupt();
        }finally{
            System.out.println(name + "放弃领导权\n");
        }

    }
}

public class TestLeaderElection {
    //会话超时时间
    private final int SESSION_TIMEOUT = 30 * 1000;

    //连接超时时间
    private final int CONNECTION_TIMEOUT = 3 * 1000;

    //客户端数量
    private final int CLIENT_NUMBER = 10;

    //ZooKeeper服务地址
    private static final String SERVER = "127.0.0.1:2181";

    private final String PATH = "/curator/latchPath";

    //创建连接实例
    private CuratorFramework client = null;

    /**
     * baseSleepTimeMs:初始的重试等待时间
     * maxRetries:最多重试次数
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    //自定义LeaderSelectorListenerAdapter实例集合
    List<CustomLeaderSelectorListenerAdapter> leaderSelectorListenerList
            = new ArrayList<CustomLeaderSelectorListenerAdapter>();

    public void init() throws Exception{
        //创建 CuratorFrameworkImpl实例
        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
        client.start();

        for(int i=0;i<CLIENT_NUMBER;i++){
            //创建LeaderSelectorListenerAdapter实例
            CustomLeaderSelectorListenerAdapter leaderSelectorListener =
                    new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + i);

            leaderSelectorListener.start();
            leaderSelectorListenerList.add(leaderSelectorListener);
        }

        //暂停当前线程,防止单元测试结束,可以让leader选举过程持续进行
        TimeUnit.SECONDS.sleep(600);
    }

    /**
     * 测试完毕关闭连接
     */
    public void close(){
        for(CustomLeaderSelectorListenerAdapter tmp : leaderSelectorListenerList){
            CloseableUtils.closeQuietly(tmp);
        }

        CloseableUtils.closeQuietly(client);
    }


    public  static void main(String[] args)throws Exception{
        TestLeaderElection test = new TestLeaderElection();
        test.init();
        test.close();
    }
}

输出:

Client #3成为当前leader
Client #3 之前成为leader的次数:0次
Client #3放弃领导权

Client #2成为当前leader
Client #2 之前成为leader的次数:0次
Client #2放弃领导权

Client #8成为当前leader
Client #8 之前成为leader的次数:0次
Client #8放弃领导权

Client #4成为当前leader
Client #4 之前成为leader的次数:0次
Client #4放弃领导权

Client #9成为当前leader
Client #9 之前成为leader的次数:0次
Client #9放弃领导权

Client #1成为当前leader
Client #1 之前成为leader的次数:0次
Client #1放弃领导权

可见,每次执行完takeLeaderShip的方法后,就会释放领导权。

参考

https://www.zifangsky.cn/1191.html
https://github.com/apache/curator/blob/master/curator-examples/src/main/java/leader

相关文章

  • zookeeper系列

    zookeeper系列(一)zookeeper必知 √zookeeper系列(二)实战master选举 √zo...

  • zookeeper Master选举

    简述 在分布式环境中,常常需要遇到这样一个场景:对于一个复杂的任务,需要从集群中选举一台机器进行处理即可。这种分布...

  • zookeeper总结

    zookeeper的应用案例:Hbase:使用zookeeper进行master选举,服务间协调。Solr:使用z...

  • zookeeper实现master选举

    zookeeper其中的一个功能就是实现集群master的选举功能,这篇文章就来实现一下这个master选举功能,...

  • Zookeeper1:Zookeeper基本原理

    1.zookeeper选举机制 Zookeeper虽然在配置文件中并没有指定master和slave,但是,zoo...

  • 基于zookeeper搭建sparkHA

    企业中一般都使用zookeeper的高可用搭建Spark。其原理就是通过zookeeper来选举一个Master,...

  • 利用Zookeeper实现 - Master选举

    原文始发于:利用Zookeeper实现 - Master选举 Zookeeper 是一个高可用的分布式数据管理与协...

  • zookeeper 中master选举与leader选举

    开始学zookeeper 中,master选举于leader 选举2个概念比较模糊,谈下自己的理解,如有不对地方请...

  • 2018-08-06:05-使用ZKclient实现master

    上一节使用zookeeper实现分布式锁,这节使用ZKClient实现master选举ZKClient使用比zoo...

  • Curator调用实例

    Curator是zookeeper使用最广泛的工具(1)zk的应用场景:分布式锁、Master选举(2)zk的应用...

网友评论

      本文标题:zookeeper Master选举

      本文链接:https://www.haomeiwen.com/subject/iajgwftx.html