54. 心跳的实现

作者: 厚土火焱 | 来源:发表于2017-09-17 11:36 被阅读289次

    在多客户端同时访问服务器的工作模式下,首先要保证服务端的运行正常。因此,Server在和Client建立通讯后,确保连接的及时断开就非常重要。否则,多个客户端长时间占用着连接不关闭,是非常可怕的服务器资源浪费。会使得服务器可服务的客户端数量大幅度减少。
    因此,针对短连接和长连接,根据业务的需要,配套不同的处理机制。

    短连接

    一般建立完连接,就立刻传输数据。传输完数据,连接就关闭。服务端根据需要,设定连接的时长。超过时间长度,就算客户端超时。立刻关闭连接。

    长连接

    建立连接后,传输数据,然后要保持连接,然后再次传输数据。直到连接关闭。


    socket 读写可以通过 SetDeadline、SetReadDeadline、SetWriteDeadline设置阻塞的时间。

    func (*IPConn) SetDeadline  
    func (c *IPConn) SetDeadline(t time.Time) error  
      
    func (*IPConn) SetReadDeadline  
    func (c *IPConn) SetReadDeadline(t time.Time) error  
      
    func (*IPConn) SetWriteDeadline  
    func (c *IPConn) SetWriteDeadline(t time.Time) error 
    

    如果做短连接,直接在 Server 端的连接上设置SetReadDeadline。当你设置的时限到达,无论客户端是否还在继续传递信息,服务端都不会再接收。并且已经关闭连接。

    func main() {
        server := ":7373"
        netListen, err := net.Listen("tcp", server)
        if err != nil{
            Log("connect error: ", err)
            os.Exit(1)
        }
        Log("Waiting for Client ...")
        for{
            conn, err := netListen.Accept()
            if err != nil{
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                continue
            }
    
            //设置短连接(10秒)
            conn.SetReadDeadline(time.Now().Add(time.Duration(10)*time.Second))
    
            Log(conn.RemoteAddr().String(), "connect success!")
            ...
        }
    }
    

    这就可以了。在这段代码中,每当10秒中的时限一到,连接就终止了。
    这个很容易做到。而我们重点要讲的是,在长连接中如何做到超时控制。
    根据业务需要,客户端可能需要长时间保持连接。但是服务端不能无限制的保持。这就需要一个机制,如果超过某个时间长度,服务端没有获得客户端的数据,就判定客户端已经不需要连接了(比如客户端挂掉了)。
    做到这个,需要一个心跳机制。在限定的时间内,客户端给服务端发送一个指定的消息,以便服务端知道客户端还活着。

    func sender(conn *net.TCPConn) {
        for i := 0; i < 10; i++{
            words := strconv.Itoa(i)+" Hello I'm MyHeartbeat Client."
            msg, err := conn.Write([]byte(words))
            if err != nil {
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                os.Exit(1)
            }
            Log("服务端接收了", msg)
            time.Sleep(2 * time.Second)
        }
        for i := 0; i < 2 ; i++ {
            time.Sleep(12 * time.Second)
        }
        for i := 0; i < 10; i++{
            words := strconv.Itoa(i)+" Hi I'm MyHeartbeat Client."
            msg, err := conn.Write([]byte(words))
            if err != nil {
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                os.Exit(1)
            }
            Log("服务端接收了", msg)
            time.Sleep(2 * time.Second)
        }
    
    }
    

    这段客户端代码,实现了两个相同的信息发送频率给服务端。两个频率中间,我们让运行休息了12秒。
    然后,我们在服务端的对应机制是这样的。

    func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
        select {
        case fk := <- bytes:
            Log(conn.RemoteAddr().String(), "心跳:第", string(fk), "times")
            conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
            break
    
            case <- time.After(5 * time.Second):
                Log("conn dead now")
                conn.Close()
        }
    }
    

    每次接收到心跳数据就 SetDeadline 延长一个时间段 timeout。如果没有接到心跳数据,5秒后连接关闭。
    服务端完整代码示例

    /**
    * MyHeartbeatServer
    * @Author:  Jian Junbo
    * @Email:   junbojian@qq.com
    * @Create:  2017/9/16 14:02
    * Copyright (c) 2017 Jian Junbo All rights reserved.
    *
    * Description:  
    */
    package main
    
    import (
        "net"
        "fmt"
        "os"
        "time"
    )
    
    func main() {
        server := ":7373"
        netListen, err := net.Listen("tcp", server)
        if err != nil{
            Log("connect error: ", err)
            os.Exit(1)
        }
        Log("Waiting for Client ...")
        for{
            conn, err := netListen.Accept()
            if err != nil{
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                continue
            }
    
            //设置短连接(10秒)
            conn.SetReadDeadline(time.Now().Add(time.Duration(10)*time.Second))
    
            Log(conn.RemoteAddr().String(), "connect success!")
            go handleConnection(conn)
    
        }
    }
    func handleConnection(conn net.Conn) {
        buffer := make([]byte, 1024)
        for {
            n, err := conn.Read(buffer)
            if err != nil {
                Log(conn.RemoteAddr().String(), " Fatal error: ", err)
                return
            }
    
            Data := buffer[:n]
            message := make(chan byte)
            //心跳计时
            go HeartBeating(conn, message, 4)
            //检测每次是否有数据传入
            go GravelChannel(Data, message)
    
            Log(time.Now().Format("2006-01-02 15:04:05.0000000"), conn.RemoteAddr().String(), string(buffer[:n]))
        }
    
        defer conn.Close()
    }
    func GravelChannel(bytes []byte, mess chan byte) {
        for _, v := range bytes{
            mess <- v
        }
        close(mess)
    }
    func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
        select {
        case fk := <- bytes:
            Log(conn.RemoteAddr().String(), "心跳:第", string(fk), "times")
            conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
            break
    
            case <- time.After(5 * time.Second):
                Log("conn dead now")
                conn.Close()
        }
    }
    func Log(v ...interface{}) {
        fmt.Println(v...)
        return
    }
    
    

    客户端完整代码示例

    /**
    * MyHeartbeatClient
    * @Author:  Jian Junbo
    * @Email:   junbojian@qq.com
    * @Create:  2017/9/16 14:21
    * Copyright (c) 2017 Jian Junbo All rights reserved.
    *
    * Description:  
    */
    package main
    
    import (
        "net"
        "fmt"
        "os"
        "strconv"
        "time"
    )
    
    func main() {
        server := "127.0.0.1:7373"
    
        tcpAddr, err := net.ResolveTCPAddr("tcp4",server)
        if err != nil{
            Log(os.Stderr,"Fatal error:",err.Error())
            os.Exit(1)
        }
        conn, err := net.DialTCP("tcp",nil,tcpAddr)
        if err != nil{
            Log("Fatal error:",err.Error())
            os.Exit(1)
        }
        Log(conn.RemoteAddr().String(), "connection succcess!")
    
        sender(conn)
        Log("send over")
    }
    func sender(conn *net.TCPConn) {
        for i := 0; i < 10; i++{
            words := strconv.Itoa(i)+" Hello I'm MyHeartbeat Client."
            msg, err := conn.Write([]byte(words))
            if err != nil {
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                os.Exit(1)
            }
            Log("服务端接收了", msg)
            time.Sleep(2 * time.Second)
        }
        for i := 0; i < 2 ; i++ {
            time.Sleep(12 * time.Second)
        }
        for i := 0; i < 10; i++{
            words := strconv.Itoa(i)+" Hi I'm MyHeartbeat Client."
            msg, err := conn.Write([]byte(words))
            if err != nil {
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                os.Exit(1)
            }
            Log("服务端接收了", msg)
            time.Sleep(2 * time.Second)
        }
    
    }
    func Log(v ...interface{}) {
        fmt.Println(v...)
        return
    }
    
    

    服务端运行效果是这样的


    服务端只接收了第一次循环数据

    服务端根据5秒原则,在客户端第一个循环间歇的12秒的时间内,关闭了连接。

    客户端运行效果是这样的


    客户端完全不知道服务端已经关闭了连接,还是继续发送了第二次循环。

    这样做服务端确实达到了不占用过多资源的目的。但是对客户端来说不够友好。友好的方式,是每次消息往来都执行三次握手的模式。
    当然,具体是否采用,需要根据业务场景分析确定。

    相关文章

      网友评论

        本文标题:54. 心跳的实现

        本文链接:https://www.haomeiwen.com/subject/guzusxtx.html