pbft

作者: nit小星星 | 来源:发表于2019-02-25 09:15 被阅读4次

    Pbft就是超过三分之二的节点同意后进入下一阶段。eos先采用投票的方式选出一些超级节点。然后进行pbft过程。这样同步的速度是很快。却牺牲了区中心化原本的初衷、

    package consensus

    type PBFT interface {

    StartConsensus(request *RequestMsg) (*PrePrepareMsg, error)

    PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error)

    Prepare(prepareMsg *VoteMsg) (*VoteMsg, error)

    Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error)

    }

    以上定义了接口,没啥可说的

    package consensus

    type RequestMsg struct {

    Timestamp  int64  `json:"timestamp"`

    ClientID  string `json:"clientID"`

    Operation  string `json:"operation"`

    SequenceID int64  `json:"sequenceID"`

    }

    type ReplyMsg struct {

    ViewID    int64  `json:"viewID"`

    Timestamp int64  `json:"timestamp"`

    ClientID  string `json:"clientID"`

    NodeID    string `json:"nodeID"`

    Result    string `json:"result"`

    }

    type PrePrepareMsg struct {

    ViewID    int64      `json:"viewID"`

    SequenceID int64      `json:"sequenceID"`

    Digest    string      `json:"digest"`

    RequestMsg *RequestMsg `json:"requestMsg"`

    }

    type VoteMsg struct {

    ViewID    int64  `json:"viewID"`

    SequenceID int64  `json:"sequenceID"`

    Digest    string `json:"digest"`

    NodeID    string `json:"nodeID"`

    MsgType          `json:"msgType"`

    }

    type MsgType int

    const (

    PrepareMsg MsgType = iota

    CommitMsg

    )

    以上定义了消息类型。

    package consensus

    import (

    "crypto/sha256"

    "encoding/hex"

    )

    func Hash(content []byte) string {

    h := sha256.New()

    h.Write(content)

    return hex.EncodeToString(h.Sum(nil))

    }

    返回。h.sum的十六进制编码,不过得先返回content的sha256hash码

    我们再实现以上的接口

    package consensus

    import (

    "encoding/json"

    "errors"

    "time"

    "fmt"

    )

    type State struct {

    ViewID        int64

    MsgLogs        *MsgLogs

    LastSequenceID int64

    CurrentStage  Stage

    }

    type MsgLogs struct {

    ReqMsg        *RequestMsg

    PrepareMsgs  map[string]*VoteMsg

    CommitMsgs    map[string]*VoteMsg

    }

    type Stage int

    const (

    Idle        Stage = iota // Node is created successfully, but the consensus process is not started yet.

    PrePrepared              // The ReqMsgs is processed successfully. The node is ready to head to the Prepare stage.

    Prepared                // Same with `prepared` stage explained in the original paper.

    Committed                // Same with `committed-local` stage explained in the original paper.

    )

    // f: # of Byzantine faulty node

    // f = (n­1) / 3

    // n = 4, in this case.

    const f = 1

    ——————————————————————————————————————————————————————————————

    先贴上全部代码

    // lastSequenceID will be -1 if there is no last sequence ID.

    func CreateState(viewID int64, lastSequenceID int64) *State {

    return &State{

    ViewID: viewID,

    MsgLogs: &MsgLogs{

    ReqMsg:nil,

    PrepareMsgs:make(map[string]*VoteMsg),

    CommitMsgs:make(map[string]*VoteMsg),

    },

    LastSequenceID: lastSequenceID,

    CurrentStage: Idle,

    }

    }

    func (state *State) StartConsensus(request *RequestMsg) (*PrePrepareMsg, error) {

    // `sequenceID` will be the index of this message.

    sequenceID := time.Now().UnixNano()

    // Find the unique and largest number for the sequence ID

    if state.LastSequenceID != -1 {

    for state.LastSequenceID >= sequenceID {

    sequenceID += 1

    }

    }

    // Assign a new sequence ID to the request message object.

    request.SequenceID = sequenceID

    // Save ReqMsgs to its logs.

    state.MsgLogs.ReqMsg = request

    // Get the digest of the request message

    digest, err := digest(request)

    if err != nil {

    fmt.Println(err)

    return nil, err

    }

    // Change the stage to pre-prepared.

    state.CurrentStage = PrePrepared

    return &PrePrepareMsg{

    ViewID: state.ViewID,

    SequenceID: sequenceID,

    Digest: digest,

    RequestMsg: request,

    }, nil

    }

    func (state *State) PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error) {

    // Get ReqMsgs and save it to its logs like the primary.

    state.MsgLogs.ReqMsg = prePrepareMsg.RequestMsg

    // Verify if v, n(a.k.a. sequenceID), d are correct.

    if !state.verifyMsg(prePrepareMsg.ViewID, prePrepareMsg.SequenceID, prePrepareMsg.Digest) {

    return nil, errors.New("pre-prepare message is corrupted")

    }

    // Change the stage to pre-prepared.

    state.CurrentStage = PrePrepared

    return &VoteMsg{

    ViewID: state.ViewID,

    SequenceID: prePrepareMsg.SequenceID,

    Digest: prePrepareMsg.Digest,

    MsgType: PrepareMsg,

    }, nil

    }

    func (state *State) Prepare(prepareMsg *VoteMsg) (*VoteMsg, error){

    if !state.verifyMsg(prepareMsg.ViewID, prepareMsg.SequenceID, prepareMsg.Digest) {

    return nil, errors.New("prepare message is corrupted")

    }

    // Append msg to its logs

    state.MsgLogs.PrepareMsgs[prepareMsg.NodeID] = prepareMsg

    // Print current voting status

    fmt.Printf("[Prepare-Vote]: %d\n", len(state.MsgLogs.PrepareMsgs))

    if state.prepared() {

    // Change the stage to prepared.

    state.CurrentStage = Prepared

    return &VoteMsg{

    ViewID: state.ViewID,

    SequenceID: prepareMsg.SequenceID,

    Digest: prepareMsg.Digest,

    MsgType: CommitMsg,

    }, nil

    }

    return nil, nil

    }

    func (state *State) Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error) {

    if !state.verifyMsg(commitMsg.ViewID, commitMsg.SequenceID, commitMsg.Digest) {

    return nil, nil, errors.New("commit message is corrupted")

    }

    // Append msg to its logs

    state.MsgLogs.CommitMsgs[commitMsg.NodeID] = commitMsg

    // Print current voting status

    fmt.Printf("[Commit-Vote]: %d\n", len(state.MsgLogs.CommitMsgs))

    if state.committed() {

    // This node executes the requested operation locally and gets the result.

    result := "Executed"

    // Change the stage to prepared.

    state.CurrentStage = Committed

    return &ReplyMsg{

    ViewID: state.ViewID,

    Timestamp: state.MsgLogs.ReqMsg.Timestamp,

    ClientID: state.MsgLogs.ReqMsg.ClientID,

    Result: result,

    }, state.MsgLogs.ReqMsg, nil

    }

    return nil, nil, nil

    }

    func (state *State) verifyMsg(viewID int64, sequenceID int64, digestGot string) bool {

    // Wrong view. That is, wrong configurations of peers to start the consensus.

    if state.ViewID != viewID {

    return false

    }

    // Check if the Primary sent fault sequence number. => Faulty primary.

    // TODO: adopt upper/lower bound check.

    if state.LastSequenceID != -1 {

    if state.LastSequenceID >= sequenceID {

    return false

    }

    }

    digest, err := digest(state.MsgLogs.ReqMsg)

    if err != nil {

    fmt.Println(err)

    return false

    }

    // Check digest.

    if digestGot != digest {

    return false

    }

    return true

    }

    func (state *State) prepared() bool {

    if state.MsgLogs.ReqMsg == nil {

    return false

    }

    if len(state.MsgLogs.PrepareMsgs) < 2*f {

    return false

    }

    return true

    }

    func (state *State) committed() bool {

    if !state.prepared() {

    return false

    }

    if len(state.MsgLogs.CommitMsgs) < 2*f {

    return false

    }

    return true

    }

    func digest(object interface{}) (string, error) {

    msg, err := json.Marshal(object)

    if err != nil {

    return "", err

    }

    return Hash(msg), nil

    }

    —————————————————————————————————————————————————————————————-分割

    初始化的时候得创建一个concensus

    func CreateState(viewID int64, lastSequenceID int64) *State {

    return &State{

    ViewID: viewID,

    MsgLogs: &MsgLogs{

    ReqMsg:nil,

    PrepareMsgs:make(map[string]*VoteMsg),

    CommitMsgs:make(map[string]*VoteMsg),

    },

    LastSequenceID: lastSequenceID,

    CurrentStage: Idle,

    }

    }

    这个viewid lastseq是出块者的身份证和上一次出块的序列号,就相当于排队领号。这个seq使用Linux时间。

    如果我是出块者,那么我得发送一个req请求到领导者节点上。

    也就是开始合约:

    func (state *State) StartConsensus(request *RequestMsg) (*PrePrepareMsg, error) {

    // `sequenceID` will be the index of this message.

    sequenceID := time.Now().UnixNano()

    // Find the unique and largest number for the sequence ID

    if state.LastSequenceID != -1 {

    for state.LastSequenceID >= sequenceID {

    sequenceID += 1

    }

    }

    // Assign a new sequence ID to the request message object.

    request.SequenceID = sequenceID

    // Save ReqMsgs to its logs.

    state.MsgLogs.ReqMsg = request

    // Get the digest of the request message

    digest, err := digest(request)

    if err != nil {

    fmt.Println(err)

    return nil, err

    }

    // Change the stage to pre-prepared.

    state.CurrentStage = PrePrepared

    return &PrePrepareMsg{

    ViewID: state.ViewID,

    SequenceID: sequenceID,

    Digest: digest,

    RequestMsg: request,

    }, nil

    }

    然后我是领导者节点,收到请求,我就把请求变成vote信息。然后变成preprepare状态,看上图

    func (state *State) PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error) {

    // Get ReqMsgs and save it to its logs like the primary.

    state.MsgLogs.ReqMsg = prePrepareMsg.RequestMsg

    // Verify if v, n(a.k.a. sequenceID), d are correct.

    if !state.verifyMsg(prePrepareMsg.ViewID, prePrepareMsg.SequenceID, prePrepareMsg.Digest) {

    return nil, errors.New("pre-prepare message is corrupted")

    }

    // Change the stage to pre-prepared.

    state.CurrentStage = PrePrepared

    return &VoteMsg{

    ViewID: state.ViewID,

    SequenceID: prePrepareMsg.SequenceID,

    Digest: prePrepareMsg.Digest,

    MsgType: PrepareMsg,

    }, nil

    }

    这时候向其他节点发送VOTE信息

    func (state *State) Prepare(prepareMsg *VoteMsg) (*VoteMsg, error){

    if !state.verifyMsg(prepareMsg.ViewID, prepareMsg.SequenceID, prepareMsg.Digest) {

    return nil, errors.New("prepare message is corrupted")

    }

    // Append msg to its logs

    state.MsgLogs.PrepareMsgs[prepareMsg.NodeID] = prepareMsg

    // Print current voting status

    fmt.Printf("[Prepare-Vote]: %d\n", len(state.MsgLogs.PrepareMsgs))

    if state.prepared() {

    // Change the stage to prepared.

    state.CurrentStage = Prepared

    return &VoteMsg{

    ViewID: state.ViewID,

    SequenceID: prepareMsg.SequenceID,

    Digest: prepareMsg.Digest,

    MsgType: CommitMsg,

    }, nil

    }

    return nil, nil

    }

    里面有verfi 和if prepared()函数

    也就是确认一下信息是否正确和是否有三分之二的投票信息。

    func (state *State) prepared() bool {

    if state.MsgLogs.ReqMsg == nil {

    return false

    }

    if len(state.MsgLogs.PrepareMsgs) < 2*f {

    return false

    }

    return true

    }

    func (state *State) verifyMsg(viewID int64, sequenceID int64, digestGot string) bool {

    // Wrong view. That is, wrong configurations of peers to start the consensus.

    if state.ViewID != viewID {

    return false

    }

    // Check if the Primary sent fault sequence number. => Faulty primary.

    // TODO: adopt upper/lower bound check.

    if state.LastSequenceID != -1 {

    if state.LastSequenceID >= sequenceID {

    return false

    }

    }

    digest, err := digest(state.MsgLogs.ReqMsg)

    if err != nil {

    fmt.Println(err)

    return false

    }

    // Check digest.

    if digestGot != digest {

    return false

    }

    return true

    }

    当有三分之二,state就改成prepare状态了

    接下来也是如此,

    func (state *State) Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error) {

    if !state.verifyMsg(commitMsg.ViewID, commitMsg.SequenceID, commitMsg.Digest) {

    return nil, nil, errors.New("commit message is corrupted")

    }

    // Append msg to its logs

    state.MsgLogs.CommitMsgs[commitMsg.NodeID] = commitMsg

    // Print current voting status

    fmt.Printf("[Commit-Vote]: %d\n", len(state.MsgLogs.CommitMsgs))

    if state.committed() {

    // This node executes the requested operation locally and gets the result.

    result := "Executed"

    // Change the stage to prepared.

    state.CurrentStage = Committed

    return &ReplyMsg{

    ViewID: state.ViewID,

    Timestamp: state.MsgLogs.ReqMsg.Timestamp,

    ClientID: state.MsgLogs.ReqMsg.ClientID,

    Result: result,

    }, state.MsgLogs.ReqMsg, nil

    }

    return nil, nil, nil

    }

    func (state *State) committed() bool {

    if !state.prepared() {

    return false

    }

    if len(state.MsgLogs.CommitMsgs) < 2*f {

    return false

    }

    return true

    }

    说一下这个digest就是转换成json格式方便在网络传输

    func digest(object interface{}) (string, error) {

    msg, err := json.Marshal(object)

    if err != nil {

    return "", err

    }

    return Hash(msg), nil

    }

    以上就是pbft的全部,是不是少了点什么呢,当然了还少了网络传输部分。下一篇继续

    相关文章

      网友评论

          本文标题:pbft

          本文链接:https://www.haomeiwen.com/subject/ryqwyqtx.html