美文网首页
2018-05-24 raft 和rpc结合注册

2018-05-24 raft 和rpc结合注册

作者: fairy冉冉 | 来源:发表于2018-05-24 15:20 被阅读0次
    package main
    
    import (
        "sync"
        "math/rand"
        "fmt"
        "time"
        "log"
        "net/http"
        "net/rpc"
    )
    
    // 设置节点的个数为3
    const raftCount  =3
    // 声明leader
    type AppendEntiresArgs struct {
        Term int// 第几任领导
        leaderId  int//设置领导编号
    
    
    }
    // 创建一个领导对象
    var args   =AppendEntiresArgs{0,-1}
    // 声明节点类型
    type Raft struct {
        // 线程锁
        mu  sync.Mutex
        // 节点编号
        me int
        //当前领导期数
        currentTerm int
        // 为哪个节点投票
        votedFor int
        //当前节点的状态
        state int// 0 跟随者 1候选人 2就是领导
        // 厚厚一天发送消息时间
        lastMessageTime  int64
        // 节点间发送消息的通道
        message  chan bool
        // 选举通道
        eclectCh chan bool
        // 心跳信号通道
        heartBeat chan bool
        // 返回心跳信号
        heartBeatRe chan  bool
        // 超时时间
        timeout  int
    
        // 设置当前节点的领导
      currentLeader int
    }
    // 产生随机值
    func  randRange(min,max int64)int64  {
        return  rand.Int63n(max-min)+min
    }
    // 打印当前时间
    func printTime()  {
        fmt.Printf("%s",time.Now().String())
    }
    
    // 获取当前时间的ms
    func millisecond()int64  {
        return  time.Now().UnixNano()/int64(time.Millisecond)
    }
    
    func  main()  {
        // 创建三个节点
        for i:=0;i<raftCount ;i++  {
            // 创建三个raft
            Make(i)
    
        }
    
        // 对raft类实现rpc注册
        rpc.Register(new(Raft))
        rpc.HandleHTTP()
        err := http.ListenAndServe(":6000",nil)
        if err !=nil {
            log.Fatal(err)
    
        }
    }
    // 通过make函数创建raft节点的对象
    func Make(me int)*Raft  {
        rf :=&Raft{}
        rf.me = me
        rf.votedFor = -1
        rf.state = 0
        rf.timeout =0
        rf.currentLeader = -1
        rf.setTerm(0)
        // 创建通道对象
        rf.eclectCh = make(chan bool)
        rf.message = make(chan  bool)
        rf.heartBeat = make(chan bool)
        rf.heartBeatRe = make(chan bool)
        rand.Seed(time.Now().UnixNano())
    
        go rf.sendLeaderHeartBeat()
        go rf.election()
    
        return rf
    }
    // 创建raft对象的方法
    func  (rf *Raft) setTerm(term int)  {
        rf.currentTerm = term
    
    }
    // 设置节点发送心跳信号的方法
    func (rf *Raft)sendLeaderHeartBeat()  {
       for{
        // 其他节点接收到了leader发射的心跳信号
           select {
           case <-rf.heartBeat:
            rf.snedAppendEntriesImpl()
        }
       }
    
    }
    // 返回给leader确认信号
    func (rf *Raft)snedAppendEntriesImpl()  {
        if rf.currentLeader== rf.me {
            // 返回确认信号的节点个数
            var success_count int  =0i
            //设置返回确认信号的子节点的个数
            for i:=0;i<raftCount ;i++  {
                if i!=rf.me {
                    go func() {
                        //rf.heartBeatRe <-true // 暂时屏蔽掉
                        // 将返回的信号给leader
                        rp,err := rpc.DialHTTP("tcp","127.0.0.1:6000")
                        if err!=nil {
                            log.Fatal(err)
    
                        }
                        // 接收服务器返回的信息
                        var ok bool  =false
                        err1:=rp.Call("Raft.Communication",Param{"abc"},&ok)
                        if err1 !=nil{
                            log.Fatal(err1)
    
                        }
                        if ok {
                            rf.heartBeatRe <-true
                        }
                    }()
    
                }
    
            }
            // 计算返回确信信号子节点的个数大于raftCount/2 才能校验成功
            for i:=0;i<raftCount-1 ;i++  {
                select {
                case ok:= <-rf.heartBeatRe:
                    if ok {
                        success_count ++
                        if success_count>raftCount/2 {
                            fmt.Println("子节点为",rf.me)//0,1,2 循环
                            fmt.Println("投票选举成功,校验心跳信号也成功")
                            log.Fatal("over")// 结束程序进程
    
                        }
                    }
    
                }
    
            }
    
        }
    
    }
    //设置节点投票的方法
    func (rf *Raft)election ()  {
        // 设置标签
        var result bool
        for  {
            // 设置超时时间
            timeout := randRange(150,300)
            // 设置每个节点的时间
            rf.lastMessageTime = millisecond()
            select {
            // 延迟等待
            case <-time.After(time.Duration(timeout)*time.Millisecond):
                fmt.Println(rf.state)// 打印当前节点的状态
    
            }
            result= false
            for !result{
                // 选择谁作为leader
                result = rf.election_one_round(&args)
            }
        }
    }
    // 选择谁为leader
    func (rf *Raft)election_one_round(args *AppendEntiresArgs)bool  {
        var  timeout int64
        var done int
        // 是否开始心跳信号的产生
        var  triggerHeartbeat bool
        last := millisecond()
        timeout = 100
        // 信号变量标签
        success :=false
        // 修改raft节点的状态为candidate候选人状态
        rf.mu.Lock()
        rf.becomeCandidate()
        rf.mu.Unlock()
    
        fmt.Println("start electing leader")
        for  {
            for i:=0;i<raftCount ;i++  {
                if i!= rf.me {
                    //拉选票
                 printTime()
                 go func() {
                     if args.leaderId <0 {
                        // 设置此节点开始选举
                        rf.eclectCh <- true
    
                     }
                 }()
                }
    
            }
            // 设置投票数量
            done =0
            triggerHeartbeat = false
            for i:=0;i<raftCount-1 ;i++  {
                // 计算一下投票的数量
                //raftCount-1 把自己去掉
                select {
                case ok:= <-rf.eclectCh:
                    if ok {
                        done ++
                        success = done>raftCount/2
                        if success && !triggerHeartbeat {
                            // 选举成功了
                            triggerHeartbeat = true
                            rf.mu.Lock()
                            rf.brcomeLeader()
                            rf.mu.Unlock()
                            // 由leader向其他节点发送心跳信号
                            rf.heartBeat <- true
                            fmt.Println("leader发送心跳信号")
    
                        }
    
                    }
    
                }
    
            }
            if(timeout+last <millisecond()||(done>raftCount/2 || rf.currentLeader>-1)) {
                break
    
            }else {
                select {
                // 延时操作
                case <-time.After(time.Duration(10)*time.Millisecond):
                }
            }
        }
        return success
    }
    func(rf *Raft) becomeCandidate()  {
        rf.state  = 1
        rf.setTerm(rf.currentTerm +1)
        rf.votedFor = rf.me
        rf.currentLeader = -1
    }
    func (rf *Raft)brcomeLeader()  {
        rf.state = 2
        rf.currentLeader = rf.me
    }
    
    // 链接rpc的代码
    
    
    
    // 分布式通信
    type Param struct {
        Msg string
    }
    func(r *Raft)Communication(p Param,a *bool)error{
        fmt.Println(p.Msg)
        *a = true
        return  nil
    }
    

    运行:


    image.png

    1.改动的地方


    image.png

    2.改动的地方


    image.png

    相关文章

      网友评论

          本文标题:2018-05-24 raft 和rpc结合注册

          本文链接:https://www.haomeiwen.com/subject/dqjsjftx.html