美文网首页互联网架构程序员
Raft算法分析以及在软负载中的实现研究

Raft算法分析以及在软负载中的实现研究

作者: zqrferrari | 来源:发表于2017-11-15 19:45 被阅读99次

    分布式系统下数据的一致性和系统可用性一直是个难题,工程上也有多种解决方案,如:mysql/RocketMQ的主备复制方案,本文将借助软负载服务的研究对Raft分布式一致性协议做分析和讨论。

    本文主要分为以下几个部分:

    • 分布式下数据一致性的问题解决方
    • 服务发现/软负载/配置管理的介绍
    • Raft协议的原理
    • Raft在软负载集群中的实现

    1、分布式下数据一致性问题解决方案


    根据CAP理论,分布式系统中,一致性、可用性和分区容错性三者无法同时满足,并且最多只能满足其中两个。

    对于分布式系统的高可用方案,业界有一些通用的解决方案:


    分布式高可用方案对比

    其中横轴代表了分布式系统中通用的高可用解决方案,如:冷备、Master/Slave、Master/Master、两阶段提交以及基于Paxos算法的解决方案;纵轴代表了分布式系统所关心的各项指标,包括数据一致性、事务支持的程度、数据延迟、系统吞吐量、数据丢失可能性、故障自动恢复时间。

    Master/Slave解决方案的典型实现有mysql、RocketMQ。

    Paxos解决方案的实现有阿里的OceanBase。

    Raft协议产生的背景在于解决分布式系统下数据的一致性问题,本文将主要集中在介绍Raft算法实现的研究上。

    2、ConfigServer/VipServer/Diamond


    ConfigServer(服务发现)、VipServer(软负载)、Diamond(配置管理)三个产品容易引起疑惑,因为三者有一定的相似性,功能上都有配置管理的能力,设计上都基于配置的pub/sub模式,也都需要解决数据的一致性和应用的稳定性问题,因此难免会让人产生困惑,为什么需要三套不同的产品来实现近似一样的功能,对此我们先明确下三者在应用结构中所处的位置,如下图:

    ConfigServer/Diamond/VipServer的关系

    一次请求的过程如下:

    1. 用户通过域名www.taobao.com访问,经过DNS解析获得域名地址,请求到达淘宝服务器机房
    2. http请求进过lvs进行一次负载均衡,到达ngnix服务器
    3. ngnix服务器通过内置的vipService客户端,按照一定算法选择一个ip
    4. 根据ip被ngnix转发到实际处理的机器,对应的应用服务开始处理请求
    5. 如果需要进行rpc调用,则用户configServer获取对应服务的机器ip,发起rpc调用
    6. 请求处理过程中,如果需要获取配置则通过diamond来获取

    由此可见:

    • ConfigServer/VipServer关注于“服务发现”领域,服务=ip:port,运行时需要对服务进行活性检测,动态管理

      • ConfigServer主要服务于内部PRC框架的服务发现
      • VipServer主要服务于http服务发现,后期希望替换LVS发挥负载均衡的能力
    • Diamond关注与“服务配置”领域,配置=key:value,配置一经添加,一直存在,无需活性检测

    在上图的架构中VipServer集群对于数据的持久化和一致性的保障是基于Diamond的,而Diamond则是基于mysql做数据的持久化。过重的依赖无疑会极大的限制VipServer在负载均衡上的能力,阿里巴巴负载均衡技术之LVS和VIPServer的发展介绍中,未来VipServer将下沉作为基础的负载均衡设施用于替换lvs,上图中3替换掉1,所以去依赖是VipServer必须要完成的,去掉对Diamond的依赖之后,对于数据的持久化和一致性则需要自己来完成。

    去Diamond之后,VipServer集群中的每台server都会存储全量数据,提高了系统容灾能力,其中集群中机器上的数据一致性使用Raft协议来保证。

    3、Raft的原理


    在介绍Raft算法原理之前,可以借助Raft的动画演示先直观的看看Raft在解决什么问题以及解决的过程。

    相比于Paxos的晦涩难懂,Raft被设计的更易于理解和实现:

    • Raft简化了状态空间,集群中的机器在任何时候只能处于leader、follower和candidate三种状态之一
    • Raft为了解决Paxos架构对真实场景下的架构不够友好的问题,被设计为单leader算法
      选举出一个leader负责管理复制日志,保障一致性其他服务器都是follower,被动地接受数据
      leader挂掉,部分follower会参加竞选,其中一个称为下一次的leader
    • leader宕机,会触发系统自动重新选主,选主期间系统对外不可服务
    • Raft中少数派宕机,不会影响系统整体的可用性

    Raft算法状态转换如下图,主要涉及到leader选举、日志复制(数据同步)、安全性三个部分:

    Raft状态转换图

    3.1 leader选举

    集群选举的过程如下:

    • 集群刚启动时,所有机器都是follower,这个时候系统整体不可写

    • follower先等待,在等待期间如果没有收到leader的心跳,则认为leader已经不可用,并将自己作为candidate向集群中的其他机器发起选举,选取自己为leader。

    • 其他follower收到candidate发送过来的选举请求,并将candidate的选举号(term)和自己的选举号比较,如果candidate大于自己的选举号,则投票给candidate作为leader,否则放弃投票。

    • 对每个candidate来说,这次竞选会有三种情况:

      • 如果candidate收到超一半的选票,那么它获胜为这次选举的leader,本次竞选结束,接下来就是向其他服务器发送心跳、并对外提供服务
      • 如果candidate选票不够,却有其他candidate说自己成为leader,那么candidate退化为follower,本次竞选结束
      • 如果candidate选票不够,且没有其他服务器自称竞选成功(比如三个candidate平分选票时),那么就只能再等随机长度的一段时间,重新发起下一轮的竞选号召选票过程

    3.2 日志复制(数据同步)

    leader被选举出来以后,所有的读写请求都会经过leader来处理,leader负责对follower进行数据同步,即日志复制。

    日志复制包括历史数据同步和数据更新请求:

    • 历史数据同步,leader会和所有follower进行心跳,心跳会带上最新的数据版本号

      • 如果follower有这个版本号并且是最新的那个,说明它的数据已经是最新,什么同步都不用做
      • 如果follower有这个版本号但不是最新的那个,则保持leader的数据
    • 更新请求,leader接收请求,如果不失败是按照下述步骤执行的:

      • 在本机上保存数据(未提交)
      • 发给所有follower让它们也保存数据(未提交)
      • leader收到过半的成功回复,认为保存成功,leader就会在本机上正式提交这份数据
      • 回复客户端操作完毕
      • 向所有的follower发送提交命令

    3.3 安全性

    集群中机器在可能遇到的问题:

    • 网络延迟、机器宕机恢复等原因,是否会导致竞选错误任期错误?

      不会,每轮选的candidate要选票的时候,都需要附带当前选举号;而follower需要保证,每个选期只投票给第一个联系自己的candidate

    • 如果第一轮选出来的leader因为延迟没告诉某个candidate,在不同选期内是否会出现两个leader同时存在?

      不会,在follower在接收到candidate的选举请求时,需要对比candidate的选举号(term),如果candidate的term比自己的小则拒绝投票。

    • 是否存在选举无主问题,即一直选不出leader?

      simple-decree Paxos也存在这种活性(Liveness)问题,Raft的解法是,通过将等待时间随机化(比如150ms~300ms)来减少这种情况。

    • leader是否会删除或者更改数据?

      不会,被选举出的leader被设计为数据正确且完整的(有所有提交的数据),保证了数据只会从leader流向follower

    • 如果某个follower在日志复制时失败,然后又在新的选举中当选,那么有可能选举出一个数据不正确或不完整的leader?又或者是否选举出的leader会不会是没有最新数据的那个?

      不会,Raft采用了一种巧妙的方法:

      • candidata在号召投票给自己时,(除了需要带上选举版本号外)还需要带上自己的数据最新版本号
      • follower在投票时,(除了只给最新选举版本号外)不给数据版本号低于自己同意票

    4、Raft在软负载集群中的实现


    软负载集群中的机器都保存全量的机器信息(host:ip+port),机器之间的数据同步由Raft协议保证。

    根据Raft原理的介绍,在实际的实现过程中,首要解决leader的选举过程。

    4.1 选举

    机器启动之后,每台机器都会注册选举和心跳的定时任务。

    public static void init() throws Exception {
        peers.add(VIPServerProxy.getVIPServers());
    
        RaftStore.load();
    
        while (true) {
            if (notifier.tasks.size() <= 0) {
                break;
            }
        }
    
        Timer.register(new MasterElection());
        Timer.register1(new HeartBeat());
        Timer.register(new AddressServerUpdater(), Timer.ADDRESS_SERVER_UPDATE_INTVERAL_MS);
    
        if (peers.size() > 0) {
            if (lock.tryLock(3, TimeUnit.SECONDS) ) {
                initialized = true;
                lock.unlock();
            }
        } else {
            throw new Exception("peers is empty.");
        }
    }
    

    选举的等待时长是不定的,用于避免选举的无主问题

    public void resetLeaderDue() {
        leaderDueMs = Timer.LEADER_TIMEOUT_MS + RandomUtils.nextLong() % Timer.RAMDOM_MS;
    }
    

    等待时长过期则开始选举

    public static class MasterElection implements Runnable {
        @Override
        public void run() {
            try {
                RaftPeer local = peers.local();
                local.leaderDueMs -= Timer.TICK_PERIOD_MS;
                if (local.leaderDueMs > 0) {
                    return;
                }
    
                local.resetLeaderDue();
                local.resetHeartbeatDue();
    
                //等待过期开始发起选举
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("VIPSERVER-RAFT", "error while master election", e);
            }
        }
        
        public static void sendVote() {
            RaftPeer local = peers.get(NetUtils.localIP());
            local.term++;
            local.voteFor = local.ip;  //自己作为CANDIDATE,发送给其他follower
            local.state   = RaftPeer.State.CANDIDATE;
            
              //向其他机器发送选举请求
              voteRets = sendVoteToAllFollowers(local);
              
              //根据选举结果确定自己是否成为了leader,得到超过一半的选票
              decideLeader(voteRets)
        }
        
        //follower接收CANDIDATE发送的选举请求
        public static RaftPeer receivedVote(RaftPeer remote) {
            RaftPeer local = peers.get(NetUtils.localIP());
              //CANDIDATE的选举号小于自己的,不参与投票
            if (remote.term <= local.term) {
                throw new IllegalArgumentException(msg);
            }
    
            //开始投票,重设自己的选举等待周期
            local.resetLeaderDue();
    
            //自己作为投票FOLLOWER给remote
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
            local.term = remote.term;
        }
    }
    

    如果选举成功,则CANDIDATE成为leader,leader开始同follower进行心跳和日志复制(数据同步)。
    如果选举失败,则等待下一个follower的选举周期到期,进行新一轮选举。

    4.2 心跳

    leader负责对向所有的follower发送心跳和数据同步。

     public static class HeartBeat implements Runnable {
        @Override
        public void run() {
            try {
                RaftPeer local = peers.local();
                local.heartbeatDueMs -= Timer.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }
    
                local.resetHeartbeatDue();
    
                   //定时心跳
                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("VIPSERVER-RAFT", "error while sending beat", e);
            }
    
        }
     
         public static void sendBeat() throws IOException, InterruptedException {
            //自己不是leader不能发生心跳
            if (local.state != RaftPeer.State.LEADER) {return;}
    
            local.resetLeaderDue();
    
            //构建leader上的数据信息key、timestamp,不包括具体的value
            data.raftPeer  = local;
            data.key       = key;
            data.timestamp = timestamp;
            data.value     = null;
            
            sendBeatToAllFolowers(data.key);
         
            //...
         }
         
         public static RaftPeer receivedBeat(JSONObject beat) throws Exception {
            final RaftPeer local = peers.local();
            final RaftPeer remote = beat.getObject("peer", RaftPeer.class);
    
            //收到的心跳不是leader发来的,丢弃
            if (remote.state != RaftPeer.State.LEADER) {
                throw new IllegalArgumentException("xxx");
            }
    
            //收到心跳的选举号小于自己的,丢弃
            if (local.term > remote.term) {
                throw new IllegalArgumentException("xxx");
            }
    
            if (local.state != RaftPeer.State.FOLLOWER) {
                local.state = RaftPeer.State.FOLLOWER;
                local.voteFor = remote.ip;
            }
    
            final JSONArray beatDatums = beat.getJSONArray("datums");
            
            //重置自己的选举周期和心跳周期,用于follower长时间没有收到leader发来的心跳
            //leader挂掉时,自己可以重新发起选举流程
            local.resetLeaderDue();
            local.resetHeartbeatDue();
    
            peers.makeLeader(remote);
            
            //根据leader data.timestamp和本机的timestamp,确定是否需要从leader更新数据
            updateDataFromLeader(data);
            
            //...
         }
         
         public static void updateDataFromLeader(remoteData){
            if(remoteData.timestamp > local.timestamp){
                //从leader获取数据并更新
            }
         }
     }
    

    4.3 数据更新

    整个集群中,只有leader能对外提供数据更新服务。

     public static void signalPublish(String key, String value) throws Exception {
        operateLock.lock();
    
        try {
            if (!RaftCore.isLeader()) {
                JSONObject params = new JSONObject();
                params.put("key", key);
                params.put("value", value);
    
                //如果自己不是leader,则将请求转发给leader去处理
                RaftProxy.proxyPostLarge(API_PUB, params.toJSONString());
                return;
            }
    
              //请求到达leader,处理数据更新
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            datum.timestamp = System.currentTimeMillis();
    
            JSONObject json = new JSONObject();
            json.put("datum", datum);
            json.put("source", peers.local());
    
            //leader数据更新
            onPublish(json);
            
            //发送数据到所有的follower进行更新
            successOverHalf = sendPublisToAllFollowers(datum);
      
            //数据是否有一半机器更新成功,则通知保存数据
            if(successOverHalf){
                 sendStoreToAllFollowers()
            }
      
              //...
      }
    

    follower接收到leader发送来的数据更新请求

    public static void onPublish(JSONObject params){
        RaftPeer source = params.getObject("source", RaftPeer.class);
        Datum datum = params.getObject("datum", Datum.class);
        
        //如果不是leader发送过来的则丢弃
        if (!PeerSet.isLeader(source.ip)) {
           throw new IllegalStateException("xxx");
        }
    
         //如果接收到的选举版本号低于自己的则丢弃
        if (source.term < local.term) {
            throw new IllegalStateException("xxx");
        }
        
        //更新自己的选举时间
        local.resetLeaderDue();
        
        //更新数据到文件
        RaftStore.write(datum);
    }      
    

    5、后记

    本文专注于软负载系统中数据的一致性保证以及raft协议的分析,对于软负载系统中涉及到其他的问题,如:负载均衡的策略、活性检测、集群部署方案等没有过多的深入。

    相关文章

      网友评论

        本文标题:Raft算法分析以及在软负载中的实现研究

        本文链接:https://www.haomeiwen.com/subject/oivhvxtx.html