下面,我们用golang来实现一个基本的分布式系统,它有如下功能:
1、能够发送/接收请求和响应
2、能够连接到集群
3、如果无法连接到集群,(如果它是第一个节点),则可以作为主节点启动
4、每个节点有唯一的标识
5、能够在节点之间交互json数据包
6、接受命令行参数中的所有信息(将来在我们系统升级时有很大作用)
闲话少表,开始。
打开GoLand,新建一个项目BaseDApp,记住这个项目名字,后边会用到。
新建main.go文件,编写代码:
package main
import (
"fmt"
"strconv"
"time"
"math/rand"
"net"
"flag" //系统标准库,flag包实现了命令行参数的解析,具体用法,自行google
"strings"
"encoding/json"
)
type NodeInfo struct {
// 节点ID,通过随机数生成
NodeId int `json:"nodeId"`
//节点IP地址
NodeIpAddr string `json:"nodeIpAddr"`
//节点端口
Port string `json:"port"`
}
// 将节点数据信息格式化输出
// NodeInfo:{nodeId: 89423,nodeIpAddr: 127.0.0.1/8,port: 8001}
func (node *NodeInfo) String() string{
return "NodeInfo:{ nodeId:" + strconv.Itoa(node.NodeId) + ", nodeIpAddr:" + node.NodeIpAddr + ", port:" + node.Port + " }"
}
// 添加一个节点到集群的一个请求或者响应的标准格式
type AddToClusterMessage struct {
//源节点
Source NodeInfo `json:"source"`
//目的节点
Dest NodeInfo `json:"dest"`
//两个节点连接时发送的消息
Message string `json:"message"`
}
// Request/Response 信息格式化输出
func (req AddToClusterMessage) String() string {
return "AddToClusterMessage:{\n source:" + req.Source.String() + ",\n dest: " + req.Dest.String() + ",\n message:" + req.Message + " }"
}
func main() {
// 后边,我们将使用命令行,来动态地设置节点的ip地址、端口等信息,所以需要用到解析命令行参数的 flag 包
// 将命令行中的参数 makeMasterOnError 所 对应的bool值,赋给 makeMasterOnError 变量,如果命令行中不设置该参数,则默认值为false
makeMasterOnError := flag.Bool("makeMasterOnError", false, "如果IP地址没有连接到集群中,我们将其作为Master节点。")
// 将命令行中的参数 clusterip 所 对应的string值,赋给 clusterip 变量,如果命令行中不设置该参数,则默认值为127.0.0.1:8001
clusterip := flag.String("clusterip", "127.0.0.1:8001", "任何的节点连接都连接这个IP")
// 将命令行中的参数 myport 所 对应的string值,赋给 myport 变量,如果命令行中不设置该参数,则默认值为8001
myport := flag.String("myport", "8001", "ip address to run this node on. default is 8001")
// 开始解析命令行参数
flag.Parse()
// 打印命令行参数
fmt.Println("当前命令行参数 makeMasterOnError : ", *makeMasterOnError)
fmt.Println("当前命令行参数 clusterip : ", *clusterip)
fmt.Println("当前命令行参数 myport : ", *myport)
// 为节点生成ID
rand.Seed(time.Now().UTC().UnixNano()) //生成随机数之前,要先设置种子
myid := rand.Intn(99999999) //随机数
fmt.Println("随机生成 myid : ", myid)
// 获取本机 ip 地址
myip, _ := net.InterfaceAddrs()
fmt.Println("获取本机 myip : ", myip[0])
// 创建 NodeInfo 结构体对象 me,即本机节点
me := NodeInfo{myid,myip[0].String(),*myport}
fmt.Println("创建 me 节点: ",me.String())
//根据命令行中设置的集群ip地址参数 cluserip,来创建一个目标节点
dest := NodeInfo{-1,strings.Split(*clusterip,":")[0],strings.Split(*clusterip,":")[1]}
fmt.Println("创建 dest 节点: ",dest.String())
fmt.Println("尝试将 me 连接到集群 dest")
// 尝试将 me 连接到集群 dest,在已连接的情况下并且向集群发送请求
ableToConnect := connectToCluster(me, dest)
// 监听其他节点将要加入到集群的请求
// ableToConnect 连接成功
// (!ableToConnect && *makeMasterOnError) 如果me是集群中第一个启动的节点,me想去连接dest,是肯定会失败的,所以将me作为master节点。
if ableToConnect || (!ableToConnect && *makeMasterOnError) {
if *makeMasterOnError {
fmt.Println("me 符合成为master节点的条件,集群正式建立")
}
//果然 me 成功地连接到集群dest,则监听me上的端口,看是否有消息
listenOnPort(me)
}else {
fmt.Printf("me 连接 dest 失败,请在命令行中设置 makeMasterOnError 参数 为 true ,以便将 me 节点 id = %v 设为 master 节点.\n",myid)
}
}
// 这是发送请求时格式化json包用的工具,这是非常重要的,如果不经过数据格式化,你最终发送的将是空白消息
// 利用节点的具体信息,生成一个 AddToClusterMessage 结构体对象
func getAddToClusterMessage(source NodeInfo, dest NodeInfo, message string) (AddToClusterMessage) {
return AddToClusterMessage{
Source : NodeInfo{
NodeId : source.NodeId,
NodeIpAddr : source.NodeIpAddr,
Port : source.Port,
},
Dest : NodeInfo{
NodeId : dest.NodeId,
NodeIpAddr : dest.NodeIpAddr,
Port : dest.Port,
},
Message : message,
}
}
/*
encoder与decoder像是在writer外面封装了一层。会根据指定的数据结构的格式进行读写。如果文件中的json格式与指定的数据结构的格式不一致会出现error。
在decoder的过程中,用一个for{}不停的读文件,直到出现error,代表文件结束。在for{}中,每次都要申请一个新的空间,存放从文件中读取出来的数据。
*/
// 尝试将 me 连接到集群 dest
func connectToCluster(me NodeInfo, dest NodeInfo) (bool) {
// 建立 TCP Socket 连接,此处使用超时机制的DialTimeout 方法
connOut,err := net.DialTimeout("tcp", dest.NodeIpAddr + ":" + dest.Port, time.Duration(10) * time.Second)
if err != nil {
if _, ok := err.(net.Error); ok {
fmt.Printf("节点 me = %v 未连接到集群. %v\n", me.NodeId,err)
}
}else {
fmt.Printf("节点 me = %v 成功连接到集群,发送消息到节点.\n", me.NodeId)
text := "Hi Fairy.. 请添加我到集群中..."
requestMessage := getAddToClusterMessage(me,dest,text)
json.NewEncoder(connOut).Encode(&requestMessage) //将 结构体AddToClusterMessage对象 转化成 json 字符串,写入connOut中
//下面这四行,跟节点之间的信息交流无关,可省略,这里只是想把json再次转化为结构体AddToClusterMessage对象,从而格式化输出
decoder := json.NewDecoder(connOut)
var responseMessage AddToClusterMessage
decoder.Decode(&responseMessage)
fmt.Println("得到数据响应:\n" + responseMessage.String())
return true
}
return false
}
/*
encoder与decoder像是在writer外面封装了一层。会根据指定的数据结构的格式进行读写。如果文件中的json格式与指定的数据结构的格式不一致会出现error。
在decoder的过程中,用一个for{}不停的读文件,直到出现error,代表文件结束。在for{}中,每次都要申请一个新的空间,存放从文件中读取出来的数据。
*/
func listenOnPort(me NodeInfo) {
// 监听即将到来的消息
ln, _ := net.Listen("tcp", fmt.Sprint(":" + me.Port))
// 接受连接
for {
connIn,err := ln.Accept()
if err != nil {
if _,ok := err.(net.Error); ok {
fmt.Println("Error received while listening", me.NodeId)
}
}else {
// 将接收到的 json 数据,转化为 AddToClusterMessage 结构体对象
//下面这三行,跟节点之间的信息交流无关,可省略,这里只是想把json再次转化为结构体AddToClusterMessage对象,从而格式化输出
var requestMessage AddToClusterMessage
json.NewDecoder(connIn).Decode(&requestMessage)
fmt.Println("Got request:\n" + requestMessage.String())
// 回复
text := "OK God.. 我马上把你加入集群中.."
responseMessage := getAddToClusterMessage(me,requestMessage.Source, text)
json.NewEncoder(connIn).Encode(&responseMessage)
connIn.Close()
}
}
}
代码写好后,该运行看看效果了,这里我们使用命令行工具。
1、如下图,点击Terminal,启动命令行。
1.png
2、启动成功。
2.png
3、执行命令,安装BaseDApp(输入 go install BaseDApp) ,并运行BaseDApp(输入 BaseDApp)
3.png
4、重新运行BaseDApp,并设置makeMasterOnError参数为true。
4.png
5、从命令行界面得知,此时程序并没有结束,光标一闪一闪的,me节点正在监听。
6、打开Mac自带的终端,因为我们之前已经在GoLand自带的Terminal中,执行go install BaseDApp命令,安装了BaseDApp,所以此时在系统终端中,可以直接运行BaseDApp项目,输入BaseDApp -clusterip 127.0.0.1:8001 -myport 8002 ,解释一下,clusterip指的是,目的节点的ip地址,即代码中的dest,myport指的是,me节点用来跟其他节点通信的端口,makeMasterOnError没写,使用默认值false(代码中规定了)。
5.png
7、由图5可知,系统终端代表的节点,成功的连接到了GoLand自带的Terminal代表的节点。
8、再次打开一个Mac自带的终端,输入BaseDApp -clusterip 127.0.0.1:8002 -myport 8003,让第二个系统终端代表的节点,去连接第一个系统终端代表的节点。
6.png
9、总之,你可以通过修改命令行参数,连接不同的节点。
网友评论