关于zookeeper的介绍,请大家自行google。本文仅介绍,如何使用go语言版本的zk包来实现一个简单的分布式server。
本文主要分为两部分,第一部分,安装zookeeper服务;第二部分,使用zk包实现分布式server。
第一部分
首先,要在电脑上安装zookeeper,并启动它,本文以Mac电脑为例,使用brew来安装zookeeper。
3、安装成功后,输入命令:cd /usr/local/etc/zookeeper,前往zookeeper文件夹,这个路径,是使用brew 安装 zookeeper
所自动生成的。在这个文件夹里,有如下文件:
其中,zoo.cfg 文件,就是缺省配置文件。使用cat 命令查看该文件:
4.png
其中,dataDir顾名思义就是Zookeeper保存数据的目录,默认情况下Zookeeper将写数据的日志文件也保存在这个目录里。但我们一般会设置一个dataLogDir目录,用来保存日志,这将极大的提升ZK性能。
而clientPort这个端口就是客户端(应用程序)连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求。
更多的参数,请参考:http://www.cnblogs.com/xiohao/p/5541093.html
4、在我们这个简单的例子中,配置文件不需要额外的修改,保持原状即可。
6、输入:zkServer status 查看当前zookeeper的运行状态,发现zookeeper服务尚未启动: 6.png
7、输入:zkServer start 启动服务,再次输入 zkServer status 查看状态: 8.png
8、至此,zookeeper服务成功启动。此外,zookeeper还提供了一个客户端命令行工具zkCli ,供开发人员使用。
9、输入:zkCli ,连接zookeeper server:
10、zkCli 连接成功后,输入:h,查看 zkCli 所提供的各类操作: 10.png
11、输入:ls / 查看根目录下的节点: 11.png
12、zookeeper中,节点分为四种类型:持久节点(PERSISTENT)、持久顺序节点(PERSISTENT_SEQUENTIAL)、临时节点(EPHEMERAL)、临时顺序节点(EPHEMERAL_SEQUENTIAL): 12.png 由图10可知,新建节点的命令格式为:create [-s] [-e] path data acl。解释一下,-s代表顺序,-e代表临时,这两个参数是可选的,不写的话,默认为持久节点。path是节点路径,路径必须以根目录/开头。data是节点所关联的数据。acl是节点的操作权限。
现在我们新建一个持久节点fairy。输入:create /fairy,发现没有创建成功,尝试输入:create /fairy 123,关联了数据,创建成功。 13.png
接着我们创建一个持久顺序节点ranran,发现系统自动地在ranran后面加上了一长串数字: 14.png
注意!!!后面用代码实现分布式server时,添加节点,使用了ip地址加port端口,作为节点的名字。这样命名的节点,不能是顺序节点,否则就会出现类似127.0.0.1:88990000000006这样的节点,客户端连接这样的节点的时候,就会提示端口号无效。
第二部分
直接上代码,分为3个文件,zkutil、server、client。
zkutil,工具类,封装了第三方zk包,提供接口供 server 和 client 使用
package example
import (
"fmt"
zookeeper "github.com/samuel/go-zookeeper/zk" //为了方便,给第三方包定义了一个别名 zookeeper
"time"
)
//创建zkutil文件,通过调用第三方zk包的某些方法,从而为我们自己本身的项目MyZookeeper提供一些基本的方法
//可以理解为zkutil 把 第三方 zk 给封装了一层
//const 定义常量
const (
timeOut = 20 //20s还未响应,则认为请求超时
)
//服务器ip列表
var hosts []string = []string{"127.0.0.1:2181"} // the zk server list
//连接服务器
func GetConnect() (conn *zookeeper.Conn, err error) {
//调用第三方zk包的Connect方法,判断连接服务器是否成功,成功的话,返回一个连接Conn
conn, _, err = zookeeper.Connect(hosts, timeOut*time.Second)
if err != nil {
fmt.Println("服务器连接失败",err)
}
return
}
//注册一个节点
func RegistServer(conn *zookeeper.Conn, host string) (err error) {
//调用第三方zk包的Create方法,往zookeeper的 fairy 节点下(咱们在第一部分的时候所创建的节点),新增一个服务器节点,本例中,用的host
//作为节点名字,
_, err = conn.Create("/fairy/"+host, nil, 0, zookeeper.WorldACL(zookeeper.PermAll))
return
}
//获取fairy节点下,所包含的服务器节点列表
func GetServerList(conn *zookeeper.Conn) (list []string, err error) {
list, _, err = conn.Children("/fairy")
return
}
server,服务器端
package main
import (
"fmt"
"MyZookeeper/example"
"net"
"os"
"time"
)
func main() {
//在本机上,开辟三个不同端口,模拟三台服务器设备,然后将这三台服务器加入到zookeeper中,作为服务器节点
go starServer("127.0.0.1:4444")
go starServer("127.0.0.1:5555")
go starServer("127.0.0.1:6666")
a := make(chan bool, 1)
<-a
}
//启动服务器
func starServer(port string) {
// 创建一个TCP服务端
tcpAddr, err := net.ResolveTCPAddr("tcp4", port)
checkError(err,"新建一个tcp连接")
fmt.Println("TCP : ",tcpAddr)
// 监听端口
listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err,"监听端口")
// 连接 zookeeper 服务,只有连接成功了,才能注册
conn, err := example.GetConnect()
if err != nil {
fmt.Printf("连接 zookeeper 失败: %s\n",err)
}else{
fmt.Println("连接 zookeeper 成功")
}
defer conn.Close()
err = example.RegistServer(conn, port)
if err != nil {
fmt.Printf("%s 注册节点失败: %s \n",port,err)
}else {
fmt.Printf("%s 注册节点成功 \n",port)
}
// 死循环的处理客户端请求
for {
// 等待客户的连接,注意这里是无法并发处理多个请求的
conn, err := listener.Accept()
// 如果有错误直接跳过
if err != nil {
fmt.Fprintf(os.Stderr, "本次监听,失败: %s", err)
continue
}
// 开启一个协程,向客户端发送数据,并关闭连接
go handleCient(conn, port)
}
}
func handleCient(conn net.Conn, port string) {
defer conn.Close()
// 读取接收到的 client 信息
bs := make([] byte,512)
m,_ := conn.Read(bs)
fmt.Println("收到 client 的 :",string(bs[:m]))
daytime := time.Now().String()
conn.Write([]byte("我是节点 " + port + ",此时为:" + daytime))
}
func checkError(err error, desc string) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s 失败: %s \n",desc, err)
os.Exit(1)
}
fmt.Println(desc,"成功")
}
client,客户端
package main
import (
"errors"
"fmt"
"MyZookeeper/example"
"io/ioutil"
"math/rand"
"net"
"os"
"time"
)
func main() {
for i := 0; i < 10; i++ {
fmt.Println("i :",i)
startClient()
time.Sleep(1 * time.Second)
}
}
func startClient() {
// 客户端,先去获取一个服务器地址,然后才能与之进行tcp通讯
serverHost, err := getServerHost()
if err != nil {
fmt.Printf("获取某个服务器ip地址,失败: %s \n", err)
return
}
fmt.Println("随机选择出来的服务器为 : " + serverHost)
//模拟 client 请求 serverHost 服务器
//ResolveTCPAddr用于获取一个TCPAddr
tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
checkError(err,"获取一个TCPAddr")
// 建立一个TCP连接,连接到刚选择出来的服务器
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkError(err,"client建立一个TCP连接,连接到服务器")
defer conn.Close()
// 向tcpconn中写入数据
_, err = conn.Write([]byte("hello,I'm a client ..."))
checkError(err,"向tcpconn中写入数据")
// 读取tcpconn中的数据
result, err := ioutil.ReadAll(conn)
checkError(err,"读取tcpconn中的数据")
fmt.Println("tcpconn中的数据 : ",string(result))
return
}
func getServerHost() (host string, err error) {
// 连接 zookeeper 服务,只有连接成功了,才能获取服务器列表
conn, err := example.GetConnect()
if err != nil {
fmt.Printf("连接 zookeeper 失败: %s\n",err)
return
}else{
fmt.Println("连接 zookeeper 成功")
}
defer conn.Close()
//获取fairy节点下,所包含的服务器节点列表
serverList, err := example.GetServerList(conn)
if err != nil {
fmt.Printf("获取fairy节点下,所包含的服务器节点列表,失败: %s \n", err)
return
}else{
fmt.Println("获取fairy节点下,所包含的服务器节点列表,成功")
}
count := len(serverList)
if count == 0 {
err = errors.New("fairy节点下,所包含的服务器节点列表是空的 \n")
return
}
//随机选中一个服务器返回
fmt.Println("从获取到的服务器节点列表中,随机选择一个,去响应客户端")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
host = serverList[r.Intn(3)]
return
}
func checkError(err error, desc string) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s 失败: %s \n",desc, err)
os.Exit(1)
}
fmt.Println(desc,"成功")
}
代码写好后,我们先运行server端代码,注册节点;接着再运行client端代码,查看连接情况:
网友评论