美文网首页
2018-05-15 建立一个基本的分布式系统

2018-05-15 建立一个基本的分布式系统

作者: fairy冉冉 | 来源:发表于2018-05-16 08:56 被阅读0次

    下面,我们用golang来实现一个基本的分布式系统,它有如下功能:
    1、能够发送/接收请求和响应
    2、能够连接到集群
    3、如果无法连接到集群,(如果它是第一个节点),则可以作为主节点启动
    4、每个节点有唯一的标识
    5、能够在节点之间交互json数据包
    6、接受命令行参数中的所有信息(将来在我们系统升级时有很大作用)

    闲话少表,开始。
    打开GoLand,新建一个项目BaseDApp,记住这个项目名字,后边会用到。
    新建main.go文件,编写代码:

    package main
    
    import (
        "fmt"
        "strconv"
        "time"
        "math/rand"
        "net"
        "flag" //系统标准库,flag包实现了命令行参数的解析,具体用法,自行google
        "strings"
        "encoding/json"
    )
    
    type NodeInfo struct {
        // 节点ID,通过随机数生成
        NodeId int `json:"nodeId"`
        
        //节点IP地址
        NodeIpAddr string `json:"nodeIpAddr"`
    
        //节点端口
        Port string `json:"port"`
    }
    
    // 将节点数据信息格式化输出
    // NodeInfo:{nodeId: 89423,nodeIpAddr: 127.0.0.1/8,port: 8001}
    func (node *NodeInfo) String() string{
        return "NodeInfo:{ nodeId:" + strconv.Itoa(node.NodeId) + ", nodeIpAddr:" + node.NodeIpAddr + ", port:" + node.Port + " }"
    }
    
    // 添加一个节点到集群的一个请求或者响应的标准格式
    type AddToClusterMessage struct {
        //源节点
        Source NodeInfo `json:"source"`
    
        //目的节点
        Dest NodeInfo `json:"dest"`
    
        //两个节点连接时发送的消息
        Message string `json:"message"`
    }
    
    // Request/Response 信息格式化输出
    func (req AddToClusterMessage) String() string {
        return "AddToClusterMessage:{\n source:" + req.Source.String() + ",\n dest: " + req.Dest.String() + ",\n message:" + req.Message + " }"
    }
    
    func main() {
    
        // 后边,我们将使用命令行,来动态地设置节点的ip地址、端口等信息,所以需要用到解析命令行参数的 flag 包
    
        // 将命令行中的参数 makeMasterOnError 所 对应的bool值,赋给 makeMasterOnError 变量,如果命令行中不设置该参数,则默认值为false
        makeMasterOnError := flag.Bool("makeMasterOnError", false, "如果IP地址没有连接到集群中,我们将其作为Master节点。")
    
        // 将命令行中的参数 clusterip 所 对应的string值,赋给 clusterip 变量,如果命令行中不设置该参数,则默认值为127.0.0.1:8001
        clusterip := flag.String("clusterip", "127.0.0.1:8001", "任何的节点连接都连接这个IP")
    
        // 将命令行中的参数 myport 所 对应的string值,赋给 myport 变量,如果命令行中不设置该参数,则默认值为8001
        myport := flag.String("myport", "8001", "ip address to run this node on. default is 8001")
    
        // 开始解析命令行参数
        flag.Parse()
    
        // 打印命令行参数
        fmt.Println("当前命令行参数 makeMasterOnError : ", *makeMasterOnError)
        fmt.Println("当前命令行参数 clusterip : ", *clusterip)
        fmt.Println("当前命令行参数 myport : ", *myport)
    
        // 为节点生成ID
        rand.Seed(time.Now().UTC().UnixNano())     //生成随机数之前,要先设置种子
        myid := rand.Intn(99999999)            //随机数
        fmt.Println("随机生成 myid : ", myid)
    
        // 获取本机 ip 地址
        myip, _ := net.InterfaceAddrs()
        fmt.Println("获取本机 myip : ", myip[0])
    
        // 创建 NodeInfo 结构体对象 me,即本机节点
        me := NodeInfo{myid,myip[0].String(),*myport}
        fmt.Println("创建 me 节点: ",me.String())
    
        //根据命令行中设置的集群ip地址参数 cluserip,来创建一个目标节点
        dest := NodeInfo{-1,strings.Split(*clusterip,":")[0],strings.Split(*clusterip,":")[1]}
        fmt.Println("创建 dest 节点: ",dest.String())
    
        fmt.Println("尝试将 me 连接到集群 dest")
        // 尝试将 me 连接到集群 dest,在已连接的情况下并且向集群发送请求
        ableToConnect := connectToCluster(me, dest)
    
        // 监听其他节点将要加入到集群的请求
        // ableToConnect 连接成功
        // (!ableToConnect && *makeMasterOnError) 如果me是集群中第一个启动的节点,me想去连接dest,是肯定会失败的,所以将me作为master节点。
        if ableToConnect || (!ableToConnect && *makeMasterOnError) {
            if *makeMasterOnError {
                fmt.Println("me 符合成为master节点的条件,集群正式建立")
            }
    
            //果然 me 成功地连接到集群dest,则监听me上的端口,看是否有消息
            listenOnPort(me)
        }else {
            fmt.Printf("me 连接 dest 失败,请在命令行中设置 makeMasterOnError 参数 为 true ,以便将 me 节点 id = %v 设为 master 节点.\n",myid)
    
        }
    
    }
    
    
    //  这是发送请求时格式化json包用的工具,这是非常重要的,如果不经过数据格式化,你最终发送的将是空白消息
    //  利用节点的具体信息,生成一个 AddToClusterMessage 结构体对象
    func getAddToClusterMessage(source NodeInfo, dest NodeInfo, message string) (AddToClusterMessage) {
        return AddToClusterMessage{
            Source : NodeInfo{
                NodeId : source.NodeId,
                NodeIpAddr : source.NodeIpAddr,
                Port : source.Port,
            },
            Dest : NodeInfo{
                NodeId : dest.NodeId,
                NodeIpAddr : dest.NodeIpAddr,
                Port : dest.Port,
            },
            Message : message,
        }
    }
    
    
    /*
        encoder与decoder像是在writer外面封装了一层。会根据指定的数据结构的格式进行读写。如果文件中的json格式与指定的数据结构的格式不一致会出现error。
        在decoder的过程中,用一个for{}不停的读文件,直到出现error,代表文件结束。在for{}中,每次都要申请一个新的空间,存放从文件中读取出来的数据。
     */
    //  尝试将 me 连接到集群 dest
    func connectToCluster(me NodeInfo, dest NodeInfo) (bool)  {
    
        // 建立 TCP Socket 连接,此处使用超时机制的DialTimeout 方法
        connOut,err := net.DialTimeout("tcp", dest.NodeIpAddr + ":" + dest.Port, time.Duration(10) * time.Second)
    
        if err != nil {
            if _, ok := err.(net.Error); ok {
                fmt.Printf("节点 me = %v 未连接到集群. %v\n", me.NodeId,err)
            }
        }else {
            fmt.Printf("节点 me = %v 成功连接到集群,发送消息到节点.\n", me.NodeId)
            text := "Hi Fairy.. 请添加我到集群中..."
            requestMessage := getAddToClusterMessage(me,dest,text)
            json.NewEncoder(connOut).Encode(&requestMessage) //将 结构体AddToClusterMessage对象 转化成 json 字符串,写入connOut中
    
            //下面这四行,跟节点之间的信息交流无关,可省略,这里只是想把json再次转化为结构体AddToClusterMessage对象,从而格式化输出
            decoder := json.NewDecoder(connOut)
            var responseMessage AddToClusterMessage
            decoder.Decode(&responseMessage)
            fmt.Println("得到数据响应:\n" + responseMessage.String())
    
            return true
        }
    
        return false
    }
    
    
    /*
        encoder与decoder像是在writer外面封装了一层。会根据指定的数据结构的格式进行读写。如果文件中的json格式与指定的数据结构的格式不一致会出现error。
        在decoder的过程中,用一个for{}不停的读文件,直到出现error,代表文件结束。在for{}中,每次都要申请一个新的空间,存放从文件中读取出来的数据。
     */
    func listenOnPort(me NodeInfo)  {
        // 监听即将到来的消息
        ln, _ := net.Listen("tcp", fmt.Sprint(":" + me.Port))
    
        // 接受连接
        for {
            connIn,err := ln.Accept()
            if err != nil {
                 if _,ok := err.(net.Error); ok {
                    fmt.Println("Error received while listening", me.NodeId)
                 }
            }else {
                // 将接收到的 json 数据,转化为 AddToClusterMessage 结构体对象
                //下面这三行,跟节点之间的信息交流无关,可省略,这里只是想把json再次转化为结构体AddToClusterMessage对象,从而格式化输出
                var requestMessage AddToClusterMessage
                json.NewDecoder(connIn).Decode(&requestMessage)
                fmt.Println("Got request:\n" + requestMessage.String())
    
                // 回复
                text := "OK God.. 我马上把你加入集群中.."
                responseMessage := getAddToClusterMessage(me,requestMessage.Source, text)
                json.NewEncoder(connIn).Encode(&responseMessage)
                connIn.Close()
            }
        }
    
    }
    

    代码写好后,该运行看看效果了,这里我们使用命令行工具。
    1、如下图,点击Terminal,启动命令行。


    1.png

    2、启动成功。


    2.png

    3、执行命令,安装BaseDApp(输入 go install BaseDApp) ,并运行BaseDApp(输入 BaseDApp)


    3.png

    4、重新运行BaseDApp,并设置makeMasterOnError参数为true。


    4.png

    5、从命令行界面得知,此时程序并没有结束,光标一闪一闪的,me节点正在监听。

    6、打开Mac自带的终端,因为我们之前已经在GoLand自带的Terminal中,执行go install BaseDApp命令,安装了BaseDApp,所以此时在系统终端中,可以直接运行BaseDApp项目,输入BaseDApp -clusterip 127.0.0.1:8001 -myport 8002 ,解释一下,clusterip指的是,目的节点的ip地址,即代码中的dest,myport指的是,me节点用来跟其他节点通信的端口,makeMasterOnError没写,使用默认值false(代码中规定了)。


    5.png

    7、由图5可知,系统终端代表的节点,成功的连接到了GoLand自带的Terminal代表的节点。

    8、再次打开一个Mac自带的终端,输入BaseDApp -clusterip 127.0.0.1:8002 -myport 8003,让第二个系统终端代表的节点,去连接第一个系统终端代表的节点。


    6.png

    9、总之,你可以通过修改命令行参数,连接不同的节点。

    相关文章

      网友评论

          本文标题:2018-05-15 建立一个基本的分布式系统

          本文链接:https://www.haomeiwen.com/subject/gkeudftx.html