美文网首页设计模式
手撸golang GO与微服务 ChatServer之2

手撸golang GO与微服务 ChatServer之2

作者: 老罗话编程 | 来源:发表于2021-03-08 09:40 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    案例需求(聊天服务器)

    • 用户可以连接到服务器。
    • 用户可以设定自己的用户名。
    • 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。

    目标

    • 实现聊天服务端, 支持端口监听, 多个客户端的连入, 消息收发, 广播, 断开, 并采集日志
    • 改造已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以防止死锁
    • 测试多个客户端的连入, 收发和断开, 并诊断服务端日志

    设计

    • IMsg: 定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
    • IMsgDecoder: 定义消息解码器及其实现
    • IChatClient: 定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
    • tChatClient: 聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
    • IChatServer: 定义聊天服务器接口, 为方便测试, 提供日志采集方法
    • tChatServer: 实现聊天服务器IChatServer

    单元测试

    ChatServer_test.go

    package chat_server
    
    import (
        "fmt"
        cs "learning/gooop/chat_server"
        "strings"
        "testing"
        "time"
    )
    
    func Test_ChatServer(t *testing.T) {
        fnAssertTrue := func(b bool, msg string) {
            if !b {
                t.Fatal(msg)
            }
        }
    
        port := 3333
        server := cs.NewChatServer()
        err := server.Open(port)
        if err != nil {
            t.Fatal(err)
        }
    
        clientCount := 3
        address := fmt.Sprintf("localhost:%v", port)
        for i := 0;i < clientCount;i++ {
            err, client := cs.DialChatClient(address)
            if err != nil {
                t.Fatal(err)
            }
    
            id := fmt.Sprintf("c%02d", i)
            client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {
                t.Logf("%v recv: %v\n", id, msg)
            })
    
            go func() {
                client.SetName(id)
                client.Send(&cs.NameMsg{id })
    
                n := 0
                for range time.Tick(time.Duration(1) * time.Second) {
                    client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })
    
                    n++
                    if n >= 3 {
                        break
                    }
                }
    
                client.Close()
            }()
        }
    
        passedSeconds := 0
        for range time.Tick(time.Second) {
            passedSeconds++
            t.Logf("%v seconds passed", passedSeconds)
    
            if passedSeconds >= 5 {
                break
            }
        }
        server.Close()
    
        logs := server.GetLogs()
        fnHasLog := func(log string) bool {
            for _,it := range logs {
                if strings.Contains(it, log) {
                    return true
                }
            }
            return false
        }
    
        for i := 0;i < clientCount;i++ {
            msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
            fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
    
            msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
            fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
        }
    }
    

    测试输出

    $ go test -v ChatServer_test.go 
    === RUN   Test_ChatServer
    tChatServer.handleIncomingConn, clientCount=1
    tChatServer.handleIncomingConn, clientCount=2
    tChatServer.handleIncomingConn, clientCount=3
        ChatServer_test.go:59: 1 seconds passed
        ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
        ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
        ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
        ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
        ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
        ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
        ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
        ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
        ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
        ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
        ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
        ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
        ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
        ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
        ChatServer_test.go:59: 2 seconds passed
        ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
        ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
        ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
    tChatClient.postConnClosed, c00, serverFlag=false
    tChatClient.postConnClosed, c02, serverFlag=false
    tChatClient.postConnClosed, c01, serverFlag=false
    tChatClient.postConnClosed, c02, serverFlag=true
    tChatServer.handleClientClosed, c02
    tChatServer.handleClientClosed, c02, clientCount=2
    tChatClient.postConnClosed, c01, serverFlag=true
    tChatServer.handleClientClosed, c01
    tChatServer.handleClientClosed, c01, clientCount=1
        ChatServer_test.go:59: 3 seconds passed
    tChatClient.postConnClosed, c00, serverFlag=true
    tChatServer.handleClientClosed, c00
    tChatServer.handleClientClosed, c00, clientCount=0
        ChatServer_test.go:59: 4 seconds passed
        ChatServer_test.go:59: 5 seconds passed
    --- PASS: Test_ChatServer (5.00s)
    PASS
    ok      command-line-arguments  5.003s
    

    IMsg.go

    定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码

    package chat_server
    
    import (
        "encoding/base64"
        "fmt"
    )
    
    type IMsg interface {
        Encode() string
    }
    
    type NameMsg struct {
        Name string
    }
    
    func (me *NameMsg) Encode() string {
        return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
    }
    
    type ChatMsg struct {
        Name string
        Words string
    }
    
    func (me *ChatMsg) Encode() string {
        return fmt.Sprintf("CHAT %s %s\n",
            base64.StdEncoding.EncodeToString([]byte(me.Name)),
            base64.StdEncoding.EncodeToString([]byte(me.Words)),
        )
    }
    

    IMsgDecoder.go

    定义消息解码器及其实现

    package chat_server
    
    import (
        "encoding/base64"
        "strings"
    )
    
    
    type IMsgDecoder interface {
        Decode(line string) (bool, IMsg)
    }
    
    type tMsgDecoder struct {
    }
    
    func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {
        items := strings.Split(line, " ")
        size := len(items)
    
        if items[0] == "NAME" && size == 2 {
            name, err := base64.StdEncoding.DecodeString(items[1])
            if err != nil {
                return false, nil
            }
    
            return true, &NameMsg{
                Name: string(name),
            }
        }
    
        if items[0] == "CHAT" && size == 3 {
            name, err := base64.StdEncoding.DecodeString(items[1])
            if err != nil {
                return false, nil
            }
    
            words, err := base64.StdEncoding.DecodeString(items[2])
            if err != nil {
                return false, nil
            }
    
            return true, &ChatMsg{
                Name: string(name),
                Words: string(words),
            }
        }
    
        return false, nil
    }
    
    
    var MsgDecoder = &tMsgDecoder{}
    

    IChatClient.go

    定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.

    package chat_server
    
    type IChatClient interface {
        GetName() string
        SetName(name string)
    
        Send(msg IMsg)
        RecvHandler(handler ClientRecvFunc)
        CloseHandler(handler ClientCloseFunc)
    
        Close()
    }
    
    type ClientRecvFunc func(client IChatClient, msg IMsg)
    type ClientCloseFunc func(client IChatClient)
    

    tChatClient.go

    聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.

    package chat_server
    
    import (
        "bufio"
        "fmt"
        "io"
        "net"
        "sync/atomic"
        "time"
    )
    
    type tChatClient struct {
        conn net.Conn
        name string
        openFlag int32
        closeFlag int32
        serverFlag bool
    
        closeChan chan bool
        sendChan chan IMsg
    
        sendLogs []IMsg
        dropLogs []IMsg
        recvLogs []IMsg
        pendingSend int32
    
        recvHandler ClientRecvFunc
        closeHandler ClientCloseFunc
    }
    
    var gMaxPendingSend int32 = 100
    
    func DialChatClient(address string) (error, IChatClient) {
        conn, err := net.Dial("tcp", address)
        if err != nil {
            return err, nil
        }
    
        return nil, openChatClient(conn, false)
    }
    
    func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
        it := &tChatClient{
            conn: conn,
            openFlag: 0,
            closeFlag: 0,
            serverFlag: serverFlag,
    
            closeChan: make(chan bool),
            sendChan: make(chan IMsg, gMaxPendingSend),
    
            name: "anonymous",
            sendLogs: []IMsg{},
            dropLogs: []IMsg{},
            recvLogs: []IMsg{},
        }
        it.open()
        return it
    }
    
    
    func (me *tChatClient) GetName() string {
        return me.name
    }
    
    func (me *tChatClient) SetName(name string) {
        me.name = name
    }
    
    func (me *tChatClient) open(){
        if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
            return
        }
    
        go me.beginWrite()
        go me.beginRead()
    }
    
    
    func (me *tChatClient) isClosed() bool {
        return me.closeFlag != 0
    }
    
    func (me *tChatClient) isNotClosed() bool {
        return !me.isClosed()
    }
    
    func (me *tChatClient) Send(msg IMsg) {
        if me.isClosed() {
            return
        }
    
        if me.pendingSend < gMaxPendingSend {
            atomic.AddInt32(&me.pendingSend, 1)
            me.sendChan <- msg
    
        } else {
            me.dropLogs = append(me.dropLogs, msg)
        }
    }
    
    func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {
        if me.isNotClosed() {
            me.recvHandler = handler
        }
    }
    
    
    func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {
        if me.isNotClosed() {
            me.closeHandler = handler
        }
    }
    
    
    func (me *tChatClient) Close() {
        if me.isNotClosed() {
            me.closeConn()
        }
    }
    
    func (me *tChatClient) beginWrite() {
        writer := io.Writer(me.conn)
        for {
            select {
            case <- me.closeChan:
                _ = me.conn.Close()
                me.closeFlag = 2
                me.postConnClosed()
                return
    
            case msg := <- me.sendChan:
                atomic.AddInt32(&me.pendingSend, -1)
                _,e := writer.Write([]byte(msg.Encode()))
                if e != nil {
                    me.closeConn()
                    break
                } else {
                    me.sendLogs = append(me.sendLogs, msg)
                }
    
            case <- time.After(time.Duration(10) * time.Second):
                me.postRecvTimeout()
                break
            }
        }
    }
    
    func (me *tChatClient) postRecvTimeout() {
        fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)
        me.closeConn()
    }
    
    func (me *tChatClient) beginRead() {
        reader := bufio.NewReader(me.conn)
        for {
            line, err := reader.ReadString('\n')
            if err != nil {
                me.closeConn()
                break
            }
    
            ok, msg := MsgDecoder.Decode(line)
            if ok {
                fn := me.recvHandler
                if fn != nil {
                    fn(me, msg)
                }
    
                me.recvLogs = append(me.recvLogs, msg)
            }
        }
    }
    
    func (me *tChatClient) closeConn() {
        if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
            return
        }
        me.closeChan <- true
    }
    
    func (me *tChatClient) postConnClosed() {
        fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)
    
        handler := me.closeHandler
        if handler != nil {
            handler(me)
        }
    
        me.closeHandler = nil
        me.recvHandler = nil
    }
    

    IChatServer.go

    定义聊天服务器接口, 为方便测试, 提供日志采集方法

    package chat_server
    
    type IChatServer interface {
        Open(port int) error
        Broadcast(msg IMsg)
        Close()
        GetLogs() []string
    }
    

    tChatServer.go

    实现聊天服务器IChatServer

    package chat_server
    
    import (
        "errors"
        "fmt"
        "net"
        "sync"
        "sync/atomic"
    )
    
    type tChatServer struct {
        openFlag int32
        closeFlag int32
    
        clients []IChatClient
        clientCount int
        clientLock *sync.RWMutex
    
        listener net.Listener
        recvLogs []IMsg
    
        logs []string
    }
    
    func NewChatServer() IChatServer {
        it := &tChatServer{
            openFlag: 0,
            closeFlag: 0,
    
            clients: []IChatClient{},
            clientCount: 0,
            clientLock: new(sync.RWMutex),
    
            listener: nil,
            recvLogs: []IMsg{},
        }
        return it
    }
    
    func (me *tChatServer) Open(port int) error {
        if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
            return errors.New("server already opened")
        }
    
        listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
        if err != nil {
            return err
        }
    
        me.listener = listener
        go me.beginListening()
        return nil
    }
    
    func (me *tChatServer) logf(f string, args... interface{}) {
        msg := fmt.Sprintf(f, args...)
        me.logs = append(me.logs, msg)
        fmt.Println(msg)
    }
    
    func (me *tChatServer) GetLogs() []string {
        return me.logs
    }
    
    func (me *tChatServer) isClosed() bool {
        return me.closeFlag != 0
    }
    
    func (me *tChatServer) isNotClosed() bool {
        return !me.isClosed()
    }
    
    func (me *tChatServer) beginListening() {
        for !me.isClosed() {
            conn, err := me.listener.Accept()
            if err != nil {
                me.Close()
                break
            }
    
            me.handleIncomingConn(conn)
        }
    }
    
    
    func (me *tChatServer) Close() {
        if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
            return
        }
    
        _ = me.listener.Close()
        me.closeAllClients()
    }
    
    func (me *tChatServer) closeAllClients() {
        me.clientLock.Lock()
        defer me.clientLock.Unlock()
    
        for i,it := range me.clients {
            if it != nil {
                it.Close()
                me.clients[i] = nil
            }
        }
        me.clientCount = 0
    }
    
    
    func (me *tChatServer) handleIncomingConn(conn net.Conn) {
        // init client
        client := openChatClient(conn, true)
        client.RecvHandler(me.handleClientMsg)
        client.CloseHandler(me.handleClientClosed)
    
        // lock me.clients
        me.clientLock.Lock()
        defer me.clientLock.Unlock()
    
        // append to me.clients
        if len(me.clients) > me.clientCount {
            me.clients[me.clientCount] = client
        } else {
            me.clients = append(me.clients, client)
        }
        me.clientCount++
    
        me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
    }
    
    func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {
        me.recvLogs = append(me.recvLogs, msg)
    
        if nameMsg,ok := msg.(*NameMsg);ok {
            client.SetName(nameMsg.Name)
    
        } else if _, ok := msg.(*ChatMsg);ok {
            me.Broadcast(msg)
        }
    }
    
    func (me *tChatServer) handleClientClosed(client IChatClient) {
        me.logf("tChatServer.handleClientClosed, %s", client.GetName())
    
        me.clientLock.Lock()
        defer me.clientLock.Unlock()
    
        if me.clientCount <= 0 {
            return
        }
    
        lastI := me.clientCount - 1
        for i,it := range me.clients {
            if it == client {
                if i == lastI {
                    me.clients[i] = nil
                } else {
                    me.clients[i], me.clients[lastI] = me.clients[lastI], nil
                }
                me.clientCount--
                break
            }
        }
    
        me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
    }
    
    func (me *tChatServer) Broadcast(msg IMsg) {
        me.clientLock.RLock()
        defer me.clientLock.RUnlock()
    
        for _,it := range me.clients {
            if it != nil {
                it.Send(msg)
            }
        }
    }
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 ChatServer之2

        本文链接:https://www.haomeiwen.com/subject/cxqpqltx.html