Pbft就是超过三分之二的节点同意后进入下一阶段。eos先采用投票的方式选出一些超级节点。然后进行pbft过程。这样同步的速度是很快。却牺牲了区中心化原本的初衷、
![](https://img.haomeiwen.com/i11995049/7a3a90f163302a65.png)
package consensus
type PBFT interface {
StartConsensus(request *RequestMsg) (*PrePrepareMsg, error)
PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error)
Prepare(prepareMsg *VoteMsg) (*VoteMsg, error)
Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error)
}
以上定义了接口,没啥可说的
package consensus
type RequestMsg struct {
Timestamp int64 `json:"timestamp"`
ClientID string `json:"clientID"`
Operation string `json:"operation"`
SequenceID int64 `json:"sequenceID"`
}
type ReplyMsg struct {
ViewID int64 `json:"viewID"`
Timestamp int64 `json:"timestamp"`
ClientID string `json:"clientID"`
NodeID string `json:"nodeID"`
Result string `json:"result"`
}
type PrePrepareMsg struct {
ViewID int64 `json:"viewID"`
SequenceID int64 `json:"sequenceID"`
Digest string `json:"digest"`
RequestMsg *RequestMsg `json:"requestMsg"`
}
type VoteMsg struct {
ViewID int64 `json:"viewID"`
SequenceID int64 `json:"sequenceID"`
Digest string `json:"digest"`
NodeID string `json:"nodeID"`
MsgType `json:"msgType"`
}
type MsgType int
const (
PrepareMsg MsgType = iota
CommitMsg
)
以上定义了消息类型。
package consensus
import (
"crypto/sha256"
"encoding/hex"
)
func Hash(content []byte) string {
h := sha256.New()
h.Write(content)
return hex.EncodeToString(h.Sum(nil))
}
返回。h.sum的十六进制编码,不过得先返回content的sha256hash码
我们再实现以上的接口
package consensus
import (
"encoding/json"
"errors"
"time"
"fmt"
)
type State struct {
ViewID int64
MsgLogs *MsgLogs
LastSequenceID int64
CurrentStage Stage
}
type MsgLogs struct {
ReqMsg *RequestMsg
PrepareMsgs map[string]*VoteMsg
CommitMsgs map[string]*VoteMsg
}
type Stage int
const (
Idle Stage = iota // Node is created successfully, but the consensus process is not started yet.
PrePrepared // The ReqMsgs is processed successfully. The node is ready to head to the Prepare stage.
Prepared // Same with `prepared` stage explained in the original paper.
Committed // Same with `committed-local` stage explained in the original paper.
)
// f: # of Byzantine faulty node
// f = (n1) / 3
// n = 4, in this case.
const f = 1
——————————————————————————————————————————————————————————————
先贴上全部代码
// lastSequenceID will be -1 if there is no last sequence ID.
func CreateState(viewID int64, lastSequenceID int64) *State {
return &State{
ViewID: viewID,
MsgLogs: &MsgLogs{
ReqMsg:nil,
PrepareMsgs:make(map[string]*VoteMsg),
CommitMsgs:make(map[string]*VoteMsg),
},
LastSequenceID: lastSequenceID,
CurrentStage: Idle,
}
}
func (state *State) StartConsensus(request *RequestMsg) (*PrePrepareMsg, error) {
// `sequenceID` will be the index of this message.
sequenceID := time.Now().UnixNano()
// Find the unique and largest number for the sequence ID
if state.LastSequenceID != -1 {
for state.LastSequenceID >= sequenceID {
sequenceID += 1
}
}
// Assign a new sequence ID to the request message object.
request.SequenceID = sequenceID
// Save ReqMsgs to its logs.
state.MsgLogs.ReqMsg = request
// Get the digest of the request message
digest, err := digest(request)
if err != nil {
fmt.Println(err)
return nil, err
}
// Change the stage to pre-prepared.
state.CurrentStage = PrePrepared
return &PrePrepareMsg{
ViewID: state.ViewID,
SequenceID: sequenceID,
Digest: digest,
RequestMsg: request,
}, nil
}
func (state *State) PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error) {
// Get ReqMsgs and save it to its logs like the primary.
state.MsgLogs.ReqMsg = prePrepareMsg.RequestMsg
// Verify if v, n(a.k.a. sequenceID), d are correct.
if !state.verifyMsg(prePrepareMsg.ViewID, prePrepareMsg.SequenceID, prePrepareMsg.Digest) {
return nil, errors.New("pre-prepare message is corrupted")
}
// Change the stage to pre-prepared.
state.CurrentStage = PrePrepared
return &VoteMsg{
ViewID: state.ViewID,
SequenceID: prePrepareMsg.SequenceID,
Digest: prePrepareMsg.Digest,
MsgType: PrepareMsg,
}, nil
}
func (state *State) Prepare(prepareMsg *VoteMsg) (*VoteMsg, error){
if !state.verifyMsg(prepareMsg.ViewID, prepareMsg.SequenceID, prepareMsg.Digest) {
return nil, errors.New("prepare message is corrupted")
}
// Append msg to its logs
state.MsgLogs.PrepareMsgs[prepareMsg.NodeID] = prepareMsg
// Print current voting status
fmt.Printf("[Prepare-Vote]: %d\n", len(state.MsgLogs.PrepareMsgs))
if state.prepared() {
// Change the stage to prepared.
state.CurrentStage = Prepared
return &VoteMsg{
ViewID: state.ViewID,
SequenceID: prepareMsg.SequenceID,
Digest: prepareMsg.Digest,
MsgType: CommitMsg,
}, nil
}
return nil, nil
}
func (state *State) Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error) {
if !state.verifyMsg(commitMsg.ViewID, commitMsg.SequenceID, commitMsg.Digest) {
return nil, nil, errors.New("commit message is corrupted")
}
// Append msg to its logs
state.MsgLogs.CommitMsgs[commitMsg.NodeID] = commitMsg
// Print current voting status
fmt.Printf("[Commit-Vote]: %d\n", len(state.MsgLogs.CommitMsgs))
if state.committed() {
// This node executes the requested operation locally and gets the result.
result := "Executed"
// Change the stage to prepared.
state.CurrentStage = Committed
return &ReplyMsg{
ViewID: state.ViewID,
Timestamp: state.MsgLogs.ReqMsg.Timestamp,
ClientID: state.MsgLogs.ReqMsg.ClientID,
Result: result,
}, state.MsgLogs.ReqMsg, nil
}
return nil, nil, nil
}
func (state *State) verifyMsg(viewID int64, sequenceID int64, digestGot string) bool {
// Wrong view. That is, wrong configurations of peers to start the consensus.
if state.ViewID != viewID {
return false
}
// Check if the Primary sent fault sequence number. => Faulty primary.
// TODO: adopt upper/lower bound check.
if state.LastSequenceID != -1 {
if state.LastSequenceID >= sequenceID {
return false
}
}
digest, err := digest(state.MsgLogs.ReqMsg)
if err != nil {
fmt.Println(err)
return false
}
// Check digest.
if digestGot != digest {
return false
}
return true
}
func (state *State) prepared() bool {
if state.MsgLogs.ReqMsg == nil {
return false
}
if len(state.MsgLogs.PrepareMsgs) < 2*f {
return false
}
return true
}
func (state *State) committed() bool {
if !state.prepared() {
return false
}
if len(state.MsgLogs.CommitMsgs) < 2*f {
return false
}
return true
}
func digest(object interface{}) (string, error) {
msg, err := json.Marshal(object)
if err != nil {
return "", err
}
return Hash(msg), nil
}
—————————————————————————————————————————————————————————————-分割
初始化的时候得创建一个concensus
func CreateState(viewID int64, lastSequenceID int64) *State {
return &State{
ViewID: viewID,
MsgLogs: &MsgLogs{
ReqMsg:nil,
PrepareMsgs:make(map[string]*VoteMsg),
CommitMsgs:make(map[string]*VoteMsg),
},
LastSequenceID: lastSequenceID,
CurrentStage: Idle,
}
}
这个viewid lastseq是出块者的身份证和上一次出块的序列号,就相当于排队领号。这个seq使用Linux时间。
如果我是出块者,那么我得发送一个req请求到领导者节点上。
也就是开始合约:
func (state *State) StartConsensus(request *RequestMsg) (*PrePrepareMsg, error) {
// `sequenceID` will be the index of this message.
sequenceID := time.Now().UnixNano()
// Find the unique and largest number for the sequence ID
if state.LastSequenceID != -1 {
for state.LastSequenceID >= sequenceID {
sequenceID += 1
}
}
// Assign a new sequence ID to the request message object.
request.SequenceID = sequenceID
// Save ReqMsgs to its logs.
state.MsgLogs.ReqMsg = request
// Get the digest of the request message
digest, err := digest(request)
if err != nil {
fmt.Println(err)
return nil, err
}
// Change the stage to pre-prepared.
state.CurrentStage = PrePrepared
return &PrePrepareMsg{
ViewID: state.ViewID,
SequenceID: sequenceID,
Digest: digest,
RequestMsg: request,
}, nil
}
然后我是领导者节点,收到请求,我就把请求变成vote信息。然后变成preprepare状态,看上图
func (state *State) PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error) {
// Get ReqMsgs and save it to its logs like the primary.
state.MsgLogs.ReqMsg = prePrepareMsg.RequestMsg
// Verify if v, n(a.k.a. sequenceID), d are correct.
if !state.verifyMsg(prePrepareMsg.ViewID, prePrepareMsg.SequenceID, prePrepareMsg.Digest) {
return nil, errors.New("pre-prepare message is corrupted")
}
// Change the stage to pre-prepared.
state.CurrentStage = PrePrepared
return &VoteMsg{
ViewID: state.ViewID,
SequenceID: prePrepareMsg.SequenceID,
Digest: prePrepareMsg.Digest,
MsgType: PrepareMsg,
}, nil
}
这时候向其他节点发送VOTE信息
func (state *State) Prepare(prepareMsg *VoteMsg) (*VoteMsg, error){
if !state.verifyMsg(prepareMsg.ViewID, prepareMsg.SequenceID, prepareMsg.Digest) {
return nil, errors.New("prepare message is corrupted")
}
// Append msg to its logs
state.MsgLogs.PrepareMsgs[prepareMsg.NodeID] = prepareMsg
// Print current voting status
fmt.Printf("[Prepare-Vote]: %d\n", len(state.MsgLogs.PrepareMsgs))
if state.prepared() {
// Change the stage to prepared.
state.CurrentStage = Prepared
return &VoteMsg{
ViewID: state.ViewID,
SequenceID: prepareMsg.SequenceID,
Digest: prepareMsg.Digest,
MsgType: CommitMsg,
}, nil
}
return nil, nil
}
里面有verfi 和if prepared()函数
也就是确认一下信息是否正确和是否有三分之二的投票信息。
func (state *State) prepared() bool {
if state.MsgLogs.ReqMsg == nil {
return false
}
if len(state.MsgLogs.PrepareMsgs) < 2*f {
return false
}
return true
}
func (state *State) verifyMsg(viewID int64, sequenceID int64, digestGot string) bool {
// Wrong view. That is, wrong configurations of peers to start the consensus.
if state.ViewID != viewID {
return false
}
// Check if the Primary sent fault sequence number. => Faulty primary.
// TODO: adopt upper/lower bound check.
if state.LastSequenceID != -1 {
if state.LastSequenceID >= sequenceID {
return false
}
}
digest, err := digest(state.MsgLogs.ReqMsg)
if err != nil {
fmt.Println(err)
return false
}
// Check digest.
if digestGot != digest {
return false
}
return true
}
当有三分之二,state就改成prepare状态了
接下来也是如此,
func (state *State) Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error) {
if !state.verifyMsg(commitMsg.ViewID, commitMsg.SequenceID, commitMsg.Digest) {
return nil, nil, errors.New("commit message is corrupted")
}
// Append msg to its logs
state.MsgLogs.CommitMsgs[commitMsg.NodeID] = commitMsg
// Print current voting status
fmt.Printf("[Commit-Vote]: %d\n", len(state.MsgLogs.CommitMsgs))
if state.committed() {
// This node executes the requested operation locally and gets the result.
result := "Executed"
// Change the stage to prepared.
state.CurrentStage = Committed
return &ReplyMsg{
ViewID: state.ViewID,
Timestamp: state.MsgLogs.ReqMsg.Timestamp,
ClientID: state.MsgLogs.ReqMsg.ClientID,
Result: result,
}, state.MsgLogs.ReqMsg, nil
}
return nil, nil, nil
}
func (state *State) committed() bool {
if !state.prepared() {
return false
}
if len(state.MsgLogs.CommitMsgs) < 2*f {
return false
}
return true
}
说一下这个digest就是转换成json格式方便在网络传输
func digest(object interface{}) (string, error) {
msg, err := json.Marshal(object)
if err != nil {
return "", err
}
return Hash(msg), nil
}
以上就是pbft的全部,是不是少了点什么呢,当然了还少了网络传输部分。下一篇继续
网友评论