美文网首页
2018-05-14 zookeeper实现分布式系统

2018-05-14 zookeeper实现分布式系统

作者: fairy冉冉 | 来源:发表于2018-05-17 08:59 被阅读0次

    关于zookeeper的介绍,请大家自行google。本文仅介绍,如何使用go语言版本的zk包来实现一个简单的分布式server。
    本文主要分为两部分,第一部分,安装zookeeper服务;第二部分,使用zk包实现分布式server。

    第一部分

    首先,要在电脑上安装zookeeper,并启动它,本文以Mac电脑为例,使用brew来安装zookeeper。

    1、打开终端,输入命令 :brew info zookeeper,查看zookeeper的相关信息。 1.png 2、输入命令:brew install zookeeper,自动安装。 2.png

    3、安装成功后,输入命令:cd /usr/local/etc/zookeeper,前往zookeeper文件夹,这个路径,是使用brew 安装 zookeeper
    所自动生成的。在这个文件夹里,有如下文件:

    3.png
    其中,zoo.cfg 文件,就是缺省配置文件。使用cat 命令查看该文件:
    4.png
    其中,dataDir顾名思义就是Zookeeper保存数据的目录,默认情况下Zookeeper将写数据的日志文件也保存在这个目录里。但我们一般会设置一个dataLogDir目录,用来保存日志,这将极大的提升ZK性能。
    而clientPort这个端口就是客户端(应用程序)连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求。
    更多的参数,请参考:http://www.cnblogs.com/xiohao/p/5541093.html

    4、在我们这个简单的例子中,配置文件不需要额外的修改,保持原状即可。

    5、此时,zookeeper已经安装成功,输入:zkServer,可查看zookeeper提供的操作命令: 5.png
    6、输入:zkServer status 查看当前zookeeper的运行状态,发现zookeeper服务尚未启动: 6.png
    7、输入:zkServer start 启动服务,再次输入 zkServer status 查看状态: 8.png

    8、至此,zookeeper服务成功启动。此外,zookeeper还提供了一个客户端命令行工具zkCli ,供开发人员使用。
    9、输入:zkCli ,连接zookeeper server:

    9.png
    10、zkCli 连接成功后,输入:h,查看 zkCli 所提供的各类操作: 10.png
    11、输入:ls / 查看根目录下的节点: 11.png
    12、zookeeper中,节点分为四种类型:持久节点(PERSISTENT)、持久顺序节点(PERSISTENT_SEQUENTIAL)、临时节点(EPHEMERAL)、临时顺序节点(EPHEMERAL_SEQUENTIAL): 12.png 由图10可知,新建节点的命令格式为:create [-s] [-e] path data acl。解释一下,-s代表顺序,-e代表临时,这两个参数是可选的,不写的话,默认为持久节点。path是节点路径,路径必须以根目录/开头。data是节点所关联的数据。acl是节点的操作权限。
    现在我们新建一个持久节点fairy。输入:create /fairy,发现没有创建成功,尝试输入:create /fairy 123,关联了数据,创建成功。 13.png
    接着我们创建一个持久顺序节点ranran,发现系统自动地在ranran后面加上了一长串数字: 14.png
    注意!!!后面用代码实现分布式server时,添加节点,使用了ip地址加port端口,作为节点的名字。这样命名的节点,不能是顺序节点,否则就会出现类似127.0.0.1:88990000000006这样的节点,客户端连接这样的节点的时候,就会提示端口号无效。

    第二部分

    直接上代码,分为3个文件,zkutil、server、client。

    zkutil,工具类,封装了第三方zk包,提供接口供 server 和 client 使用
    package example
    
    import (
        "fmt"
        zookeeper "github.com/samuel/go-zookeeper/zk"  //为了方便,给第三方包定义了一个别名 zookeeper
        "time"
    )
    
    
    //创建zkutil文件,通过调用第三方zk包的某些方法,从而为我们自己本身的项目MyZookeeper提供一些基本的方法
    //可以理解为zkutil 把 第三方 zk 给封装了一层
    
    //const 定义常量
    const (
        timeOut = 20  //20s还未响应,则认为请求超时
    )
    
    //服务器ip列表
    var hosts []string = []string{"127.0.0.1:2181"} // the zk server list
    
    
    //连接服务器
    func GetConnect() (conn *zookeeper.Conn, err error) {
    
        //调用第三方zk包的Connect方法,判断连接服务器是否成功,成功的话,返回一个连接Conn
        conn, _, err = zookeeper.Connect(hosts, timeOut*time.Second)
        if err != nil {
            fmt.Println("服务器连接失败",err)
        }
        return
    }
    
    //注册一个节点
    func RegistServer(conn *zookeeper.Conn, host string) (err error) {
    
        //调用第三方zk包的Create方法,往zookeeper的 fairy 节点下(咱们在第一部分的时候所创建的节点),新增一个服务器节点,本例中,用的host
        //作为节点名字,
        _, err = conn.Create("/fairy/"+host, nil, 0, zookeeper.WorldACL(zookeeper.PermAll))
        return
    }
    
    //获取fairy节点下,所包含的服务器节点列表
    func GetServerList(conn *zookeeper.Conn) (list []string, err error) {
        list, _, err = conn.Children("/fairy")
        return
    }
    
    
    server,服务器端
    package main
    
    import (
        "fmt"
        "MyZookeeper/example"
        "net"
        "os"
        "time"
    )
    
    func main() {
    
        //在本机上,开辟三个不同端口,模拟三台服务器设备,然后将这三台服务器加入到zookeeper中,作为服务器节点
        go starServer("127.0.0.1:4444")
        go starServer("127.0.0.1:5555")
        go starServer("127.0.0.1:6666")
    
        a := make(chan bool, 1)
        <-a
    }
    
    //启动服务器
    func starServer(port string) {
    
        // 创建一个TCP服务端
        tcpAddr, err := net.ResolveTCPAddr("tcp4", port)
        checkError(err,"新建一个tcp连接")
        fmt.Println("TCP : ",tcpAddr)
    
        // 监听端口
        listener, err := net.ListenTCP("tcp", tcpAddr)
        checkError(err,"监听端口")
    
        // 连接 zookeeper 服务,只有连接成功了,才能注册
        conn, err := example.GetConnect()
        if err != nil {
            fmt.Printf("连接 zookeeper 失败: %s\n",err)
        }else{
            fmt.Println("连接 zookeeper 成功")
        }
    
    
        defer conn.Close()
    
        err = example.RegistServer(conn, port)
        if err != nil {
            fmt.Printf("%s 注册节点失败: %s \n",port,err)
        }else {
            fmt.Printf("%s 注册节点成功 \n",port)
        }
    
        // 死循环的处理客户端请求
        for {
    
            // 等待客户的连接,注意这里是无法并发处理多个请求的
            conn, err := listener.Accept()
    
            // 如果有错误直接跳过
            if err != nil {
                fmt.Fprintf(os.Stderr, "本次监听,失败: %s", err)
                continue
            }
    
    
            // 开启一个协程,向客户端发送数据,并关闭连接
            go handleCient(conn, port)
        }
    
    }
    
    func handleCient(conn net.Conn, port string) {
        defer conn.Close()
    
        // 读取接收到的 client 信息
        bs := make([] byte,512)
        m,_ := conn.Read(bs)
        fmt.Println("收到 client 的 :",string(bs[:m]))
    
        daytime := time.Now().String()
        conn.Write([]byte("我是节点 " + port + ",此时为:" + daytime))
    }
    
    func checkError(err error, desc string) {
        if err != nil {
            fmt.Fprintf(os.Stderr, "%s 失败: %s \n",desc, err)
            os.Exit(1)
        }
        fmt.Println(desc,"成功")
    }
    
    
    client,客户端
    package main
    
    import (
        "errors"
        "fmt"
        "MyZookeeper/example"
        "io/ioutil"
        "math/rand"
        "net"
        "os"
        "time"
    )
    
    func main() {
        for i := 0; i < 10; i++ {
            fmt.Println("i :",i)
            startClient()
            time.Sleep(1 * time.Second)
        }
    }
    
    func startClient() {
    
        // 客户端,先去获取一个服务器地址,然后才能与之进行tcp通讯
        serverHost, err := getServerHost()
        if err != nil {
            fmt.Printf("获取某个服务器ip地址,失败: %s \n", err)
            return
        }
    
        fmt.Println("随机选择出来的服务器为 : " + serverHost)
    
        //模拟 client 请求 serverHost 服务器
        //ResolveTCPAddr用于获取一个TCPAddr
        tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
        checkError(err,"获取一个TCPAddr")
    
        // 建立一个TCP连接,连接到刚选择出来的服务器
        conn, err := net.DialTCP("tcp", nil, tcpAddr)
        checkError(err,"client建立一个TCP连接,连接到服务器")
    
        defer conn.Close()
    
        // 向tcpconn中写入数据
        _, err = conn.Write([]byte("hello,I'm a client ..."))
        checkError(err,"向tcpconn中写入数据")
    
        // 读取tcpconn中的数据
        result, err := ioutil.ReadAll(conn)
        checkError(err,"读取tcpconn中的数据")
        fmt.Println("tcpconn中的数据 : ",string(result))
    
        return
    }
    
    func getServerHost() (host string, err error) {
    
        // 连接 zookeeper 服务,只有连接成功了,才能获取服务器列表
        conn, err := example.GetConnect()
        if err != nil {
            fmt.Printf("连接 zookeeper 失败: %s\n",err)
            return
        }else{
            fmt.Println("连接 zookeeper 成功")
        }
    
    
        defer conn.Close()
    
        //获取fairy节点下,所包含的服务器节点列表
        serverList, err := example.GetServerList(conn)
        if err != nil {
            fmt.Printf("获取fairy节点下,所包含的服务器节点列表,失败: %s \n", err)
            return
        }else{
            fmt.Println("获取fairy节点下,所包含的服务器节点列表,成功")
        }
    
        count := len(serverList)
        if count == 0 {
            err = errors.New("fairy节点下,所包含的服务器节点列表是空的 \n")
            return
        }
    
        //随机选中一个服务器返回
        fmt.Println("从获取到的服务器节点列表中,随机选择一个,去响应客户端")
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
        host = serverList[r.Intn(3)]
        return
    }
    
    func checkError(err error, desc string) {
        if err != nil {
            fmt.Fprintf(os.Stderr, "%s 失败: %s \n",desc, err)
            os.Exit(1)
        }
        fmt.Println(desc,"成功")
    }
    

    代码写好后,我们先运行server端代码,注册节点;接着再运行client端代码,查看连接情况:

    1、运行server端: 14.png 2、运行client端: 15.png 3、回到server终端界面查看: 16.png

    相关文章

      网友评论

          本文标题:2018-05-14 zookeeper实现分布式系统

          本文链接:https://www.haomeiwen.com/subject/pgbsdftx.html