美文网首页大数据Paxos程序员
3. PhxPaxos源码分析之Proposer、Accepto

3. PhxPaxos源码分析之Proposer、Accepto

作者: 随安居士 | 来源:发表于2017-11-14 21:55 被阅读181次

    目录
    1. PhxPaxos源码分析之关于PhxPaxos
    2. PhxPaxos分析之网络基础部件
    3. PhxPaxos源码分析之Proposer、Acceptor
    4. PhxPaxos源码分析之Learner
    5. PhxPaxos源码分析之状态机
    6. PhxPaxos源码分析之归档机制
    7. PhxPaxos源码分析之整体架构


    3.1 基本概念

    Paxos算法包含三个角色:

    • Proposer
      提案发起者。负责发起提案,提案本身由编号(number)、值(value)两部分组成。
    • Acceptor
      提案接收者。负责接收Proposer的提案,通过一定的规则首先确定提案编号、最终确定提案值。
    • Learner
      选中提案值习得者。不参与提案确定过程,只负责学习已确定好的提案值。

    简单来说,提案由每个节点的Proposer、Acceptor参与决策,如果某个节点由于网络故障等原因未参与决策,由Learner负责学习已经选中提案值。

    Paxos算法本身并不好理解,这里按通俗易懂的说法讲述整个提案确定过程,完整的Paxos算计介绍请参见Paxos made simple 释译

    注意:一个完整的提案由{编号n,提案值v}两部分组成。我们只关心最终的提案值,提案编号的唯一用途在于确定提案值。

    1. Proposer选择新的编号n,并发送到所有的Acceptor。
    2. Acceptor接收到请求后完成如下工作:
      2.1 承诺(promise):承诺不再accept所有编号小于n的请求,即reject编号小于n的请求。
      2.2 返回(reply):返回当前小于n的最大编号所对应的提议值v(如果存在)。
    3. 如果Proposer发起的提案编号被半数以上的Acceptor接受,此时它可以真正的提案值,否则回到步骤一重新确定提案编号。
    4. 即便Acceptor接受了Proposer发起的提案编号,提案值也不是Proposer随意确定的。确定规则如下:
      4.1 如果步骤2.2中,一个或者多个Acceptor返回了已经被接受(accept)的提案值,新发起的提案值必须是最大编号返回的提案值。
      4.2 如果步骤2.2中,没有任何Acceptor返回已被接受(accept)的提案值,由Proposer自行发起新的提案值。
    5. Proposer发起的提案值如果被接受则提案结束,否则重复上述过程,直到确定提案值。

    Paxos算法分为提案选定阶段(Prepare)、确定提案值(accept)两个阶段。每个阶段都有可能需要执行多次,每次都花费一个RTT网络耗时。为了减少网络耗时,达到产品级应用的目的,PhxPaxos采用选主并在主上执行Paxos算法的方式避免提案冲突,随后为所有的提案执行noop操作,有条件跳过Prepare阶段将网络花费降至理论最小值(一个RTT)。

    本章先来看Proposer、Acceptor这两个角色,Learner将在下一节讲解。

    3.2 Proposer

    Proposer为提案的发起者,入口函数如下:

        int Proposer :: NewValue(const std::string& sValue)
        {
            BP->GetProposerBP()->NewProposal(sValue);
     
            if (m_oProposerState.GetValue().size() == 0)
            {
                m_oProposerState.SetValue(sValue);
            }
    
            m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
            m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
    
            // 如果允许跳过Prepare阶段 && 当前未被其他节点拒绝
            if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
            {
                BP->GetProposerBP()->NewProposalSkipPrepare();
                // 直接进入提案值确定阶段
                PLGHead("skip prepare, directly start accept");
                Accept();
            }
            else
            {
                 // 先进入Prepare阶段,随后进入提案值确定阶段
                //if not reject by someone, no need to increase ballot
                Prepare(m_bWasRejectBySomeone);
            }
    
            return 0;
        }
    

    满足如下场景,允许跳过Prepare阶段:

    • 本节点之前已经执行过Prepare阶段,并且Prepare阶段的执行结果为Accept。

    当不满足上述场景时,需要执行完整的Paxos两阶段流程。来看Prepare阶段。

    3.2.1 Prepare阶段

    Prepare的入口函数如下:

        void Proposer :: Prepare(const bool bNeedNewBallot)
        {
            PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
                    GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
                    m_oProposerState.GetValue().size());
    
            BP->GetProposerBP()->Prepare();
            m_oTimeStat.Point();
    
            //重置Proposer状态,准备进入Prepare阶段。
            ExitAccept();
            m_bIsPreparing = true;
            m_bCanSkipPrepare = false;
            m_bWasRejectBySomeone = false;
    
            m_oProposerState.ResetHighestOtherPreAcceptBallot();
    
            //分配新的提案编号
            if (bNeedNewBallot)
            {
                m_oProposerState.NewPrepare();
            }
    
            PaxosMsg oPaxosMsg;
            oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
            oPaxosMsg.set_instanceid(GetInstanceID());
            oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
            oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
    
            m_oMsgCounter.StartNewRound();
    
            //Prepare超时定时器
            AddPrepareTimer();
    
            PLGHead("END OK");
    
            //发送Prepare消息
            BroadcastMessage(oPaxosMsg);
        }
    

    发起Prepare主要做4件事:

    1. Proposer状态重置。表明当前开始进入Prepare阶段。
    2. 按需分配新的提案编号。按需的意思是指,当有其他节点明确拒绝了该提案,按Paxos协议必须使用新的提案编号重写发起提案;而如果并无其他节点拒绝,即由于超时等原因导致的重新发起提案,可沿用原有的编号。
    3. 设置Prepare超时定时器。Prepare超时原因有很多,比如网络丢包。当Prepare超时时,处理方式也很简单,重新执行Prepare。
    void Proposer::OnPrepareTimeout()
    {
        PLGHead("OK");
        //本轮提案已经选举结束,不再执行任何操作。
        if (GetInstanceID() != m_llTimeoutInstanceID)
        {
            PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
                   m_llTimeoutInstanceID, GetInstanceID());
            return;
        }
    
        BP->GetProposerBP()->PrepareTimeout();
        //重新发起Prepare
        Prepare(m_bWasRejectBySomeone);
    }
    
    1. 发送Prepare消息。消息采用UDP方式发送。发送内容包括本轮提案的实例编号、节点编号、提案编号、消息类型等。如果我们依次发起了多轮提案,每轮实例编号依次为1、2、3...,某些节点如果落后于其他节点,需要通过实例编号隔离不同的提案请求。关于整个实例的概念,后面后有详细的讲解。

    Prepare消息是发往各个节点的Acceptor的,Acceptor处理完成后发送Replay消息。Proposer交由OnPrepareReply处理Replay消息,逻辑如下:

    1. 判定消息的有效性。包括实例编号、提案编号是否一致;当前是否处在Prepare阶段等。代码略。
    2. 根据消息内容更新Proposer状态。包括更新接收节点信息、接收或拒绝状态更新。
        //更新:已从node id节点获取数据
        m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
    
        if (oPaxosMsg.rejectbypromiseid() == 0)
        {
            //更新:接受该提案,并将该节点的promise id(承诺提案编号)、提案值更新到本地。
            BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
            PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
                     oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
            m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
            m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
        }
        else
        {
            //更新:该提案被拒绝,标明本轮存在拒绝请求的节点(重新发起提案时需要更新提案值)。并将该节点已承诺的提案编号更新到本地。
            PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
            m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
            m_bWasRejectBySomeone = true;
            m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
        }
    
    1. 根据已收到的各个节点的回复,判定是否进入Accept阶段,或者重新发起Prepare。
        //已收到超过半数的Accept回复消息,直接进入Accept阶段。
        if (m_oMsgCounter.IsPassedOnThisRound())
        {
            int iUseTimeMs = m_oTimeStat.Point();
            BP->GetProposerBP()->PreparePass(iUseTimeMs);
            PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
    
            //最近一次发起的Prepare被接受,后续可跳过Prepare阶段。
            m_bCanSkipPrepare = true;
    
            //进入Accept阶段。
            Accept();
        }
        //已收到超过半数的Reject回复消息或者所有节点已回复(这个判断并不需要),重新进入Prepare阶段
        else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
        {
            BP->GetProposerBP()->PrepareNotPass();
            PLGImp("[Not Pass] wait 30ms and restart prepare");
    
            //重置Prepare超时定时器,提前触发Prepare超时。
            AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
        }
    

    3.2.2 Accept阶段

    Accept的入口函数如下:

    void Proposer::Accept()
    {
        PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
                m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
    
        BP->GetProposerBP()->Accept();
        m_oTimeStat.Point();
    
        //标识进入Accept阶段
        ExitPrepare();
        m_bIsAccepting = true;
    
        PaxosMsg oPaxosMsg;
        oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
        oPaxosMsg.set_instanceid(GetInstanceID());
        oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
        oPaxosMsg.set_value(m_oProposerState.GetValue());
        oPaxosMsg.set_lastchecksum(GetLastChecksum());
    
        m_oMsgCounter.StartNewRound();
    
        //添加Accept超时定时器
        AddAcceptTimer();
    
        PLGHead("END");
    
        //发送Accept消息
        BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
    }
    

    Accept和Prepare阶段的request操作如出一辙:

    1. Proposer状态重置。表明当前开始进入Accept阶段。
    2. 设置Accept超时定时器。Accept超时原因有很多,比如网络丢包。当Accept超时时,处理方式也很简单,重新进入Prepare阶段。
    void Proposer::OnAcceptTimeout()
    {
        PLGHead("OK");
        //本轮提案已经选举结束,不再执行任何操作。
        if (GetInstanceID() != m_llTimeoutInstanceID)
        {
            PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
                   m_llTimeoutInstanceID, GetInstanceID());
            return;
        }
    
        BP->GetProposerBP()->AcceptTimeout();
        //重新发起Prepare
        Prepare(m_bWasRejectBySomeone);
    }
    
    1. 发送Accept消息到其他节点。消息采用UDP方式发送。发送内容包括本轮提案的实例编号、节点编号、提案编号、消息类型、提案值等。

    OnAcceptReply处理逻辑和OnPrepareReply类似,包括如下几步:

    1. 判定消息有效性。包括实例编号、提案编号是否一致;当前是否处在Accept阶段等。
    2. 根据消息内容更新Proposer状态。包括更新接收节点信息、接收或拒绝状态更新。
    3. 根据已收到的各个节点的回复,判定Accept阶段是否已完成,或者重新发起Prepare。
        //提案被接受,提前退出Accept阶段
        if (m_oMsgCounter.IsPassedOnThisRound())
        {
            int iUseTimeMs = m_oTimeStat.Point();
            BP->GetProposerBP()->AcceptPass(iUseTimeMs);
            PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
            ExitAccept();  
            //通知Learner,本轮提案已完成
            m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
        }
        else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
        {
            BP->GetProposerBP()->AcceptNotPass();
            PLGImp("[Not pass] wait 30ms and Restart prepare");
    
            //Accept失败,终止Accept阶段。
            AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
        }
    

    如果提案为获得半数通过,即未被选中(chosen)将从prepare阶段重新发起提案。如果获得半数通过,learner通知所有节点提案被选中,即从accept状态改为chosen状态。

    3.3 Acceptor

    Acceptor做为提案的被动参与者,也分为Prepare和Accept两个阶段

    3.3.1 Prepare阶段

    Proposer发送的Prepare消息由Acceptor的OnPrepare处理,逻辑如下:

    int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
    {
        PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
                oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
    
        BP->GetAcceptorBP()->OnPrepare();
        
        PaxosMsg oReplyPaxosMsg;
        oReplyPaxosMsg.set_instanceid(GetInstanceID());
        oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
        oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
    
        BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
        
        //当前的提案编号大于已承诺的编号,接受该提案
        if (oBallot >= m_oAcceptorState.GetPromiseBallot())
        {
            PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                    "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                    m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                    m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
            //返回之前承诺的提案编号
            oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
            oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
            //如果之前已经接受过某个提案的提案值,返回该提案值
            if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
            {
                oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
            }
    
            //将承诺的提案更新为当前提案
            m_oAcceptorState.SetPromiseBallot(oBallot);
    
            //保证P2.c的不变性,数据需要入库
            int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
            if (ret != 0)
            {
                BP->GetAcceptorBP()->OnPreparePersistFail();
                PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                        GetInstanceID(), ret);
                
                return -1;
            }
    
            BP->GetAcceptorBP()->OnPreparePass();
        }
        else
        {
            BP->GetAcceptorBP()->OnPrepareReject();
    
            PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID);
            //已存在更高编号的提案,拒绝该提案并返回已承诺提案编号        
            oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
        }
    
        nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
    
        PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
                GetInstanceID(), oPaxosMsg.nodeid());;
    
        //发送Replay消息
        SendMessage(iReplyNodeID, oReplyPaxosMsg);
    
        return 0;
    }
    

    OnPrepare函数看似并未做任何有效性校验,但这部分校验是必不可少的,并未省去,而是出现在了调用OnPrepare的Instance类的上层函数中。关于Instance后面也会单独说明,这里的校验主要是保证参数中的instance id和acceptor一致。

    另外一点需要说明的是:前面一直提到的提案编号并不为完整的Paxos意义上的提案编号,完整的提案编号由提案编号(proposal id)+节点Id(node id)两部分组成。代码上看,BallotNumber代表了Paxos意义上的提案编号。

    class BallotNumber
    {
    public:
        ......
        uint64_t m_llProposalID;
        nodeid_t m_llNodeID;
    };
    

    3.3.2 Accept阶段

    Proposer发送的Accept消息由Acceptor的OnAccept处理,逻辑如下:

    void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
    {
        PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
                oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
    
        BP->GetAcceptorBP()->OnAccept();
    
        PaxosMsg oReplyPaxosMsg;
        oReplyPaxosMsg.set_instanceid(GetInstanceID());
        oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
        oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
    
        BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
    
        //当前的提案编号大于或等于已承诺的编号,接受该提案
        if (oBallot >= m_oAcceptorState.GetPromiseBallot())
        {
            PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                    "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                    m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                    m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
            
            //更新提案编号、提案值
            m_oAcceptorState.SetPromiseBallot(oBallot);
            m_oAcceptorState.SetAcceptedBallot(oBallot);
            m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
            
            //按Paxos协议P2.c不变性要求,数据需要持久化(包括实例号、提案编号、提案值)
            int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
            if (ret != 0)
            {
                BP->GetAcceptorBP()->OnAcceptPersistFail();
    
                PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                        GetInstanceID(), ret);
                
                return;
            }
    
            BP->GetAcceptorBP()->OnAcceptPass();
        }
        else
        {
            BP->GetAcceptorBP()->OnAcceptReject();
    
            PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID);
            
            //已存在更高编号的提案,拒绝该提案并返回已承诺提案编号
            oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
        }
    
        nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
    
        PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
                GetInstanceID(), oPaxosMsg.nodeid());
    
        SendMessage(iReplyNodeID, oReplyPaxosMsg);
    }
    

    有一点需要说明,OnPrepare、OnAccept两个阶段中,如果处理结果为接受,那么需要保证不管在何种情况下,这个结果是不变的。因此,数据需要被持久化,但这个持久化数据只需要本地保存即可,PhxPaxos中采用leveldb做本地持久化数据库。

    3.5 并发

    前面的处理逻辑中,没有看到加锁或者其他并发保护机制。那么,如果一个instance正在处理Prepare请求时,同时接收到了Accept消息怎么办呢?其实,PhxPaxos中还有另外一个IOLoop线程,专门负责接收网络消息,并串行执行。

    IOLoop线程的run函数实现如下:

    void IOLoop :: run()
    {
        m_bIsEnd = false;
        m_bIsStart = true;
        while(true)
        {
            BP->GetIOLoopBP()->OneLoop();
    
            int iNextTimeout = 1000;
            
            DealwithTimeout(iNextTimeout);
    
            //PLGHead("nexttimeout %d", iNextTimeout);
    
            OneLoop(iNextTimeout);
    
            if (m_bIsEnd)
            {
                PLGHead("IOLoop [End]");
                break;
            }
        }
    }
    

    其中,OneLoop是我们本次关注的重点:

    void IOLoop :: OneLoop(const int iTimeoutMs)
    {
        std::string * psMessage = nullptr;
    
        m_oMessageQueue.lock();
        bool bSucc = m_oMessageQueue.peek(psMessage, iTimeoutMs);
        
        if (!bSucc)
        {
            m_oMessageQueue.unlock();
        }
        else
        {
            m_oMessageQueue.pop();
            m_oMessageQueue.unlock();
    
            if (psMessage != nullptr && psMessage->size() > 0)
            {
                m_iQueueMemSize -= psMessage->size();
                m_poInstance->OnReceive(*psMessage);
            }
    
            delete psMessage;
    
            BP->GetIOLoopBP()->OutQueueMsg();
        }
    
        DealWithRetry();
    
        //must put on here
        //because addtimer on this funciton
        m_poInstance->CheckNewValue();
    }
    

    OneLoop从m_oMessageQueue中取出一个网络消息随后交给Instance处理。每个Group的Instance启动一个IOLoop线程,不同Group彼此隔离,并发处理互不影响。

    3.4 总结

    本章简要介绍了Paxos算法原理,了解到Paxos算法的三大角色:Proposer、Acceptor、Learner。讲解了Proposer、Learner两个角色的主要代码实现,以及二者如何参与到Prepare、Accept两个阶段中。

    至于最后一个角色Learner,原本的理解认为应该是参与度最低的,逻辑最少的角色。但PhxPaxos中,Learner是三者中实现最复杂的,这部分内容将在下一章单独讲解。


    【转载请注明】随安居士. 3. PhxPaxos源码分析之Proposer、Acceptor. 2017.11.14

    相关文章

      网友评论

        本文标题:3. PhxPaxos源码分析之Proposer、Accepto

        本文链接:https://www.haomeiwen.com/subject/woijpxtx.html