美文网首页网络
golang socket连接复用 - smux

golang socket连接复用 - smux

作者: 写个代码容易么 | 来源:发表于2020-06-09 16:01 被阅读0次

    今天来介绍一个socket连接复用的包
    https://github.com/xtaci/smux

    如图所示,多个channel输入通过smux合并在一个连接中,后端服务将连接中的channel分离出来进行处理

    smux.jpg

    场景分析

    假设一个简单的使用场景,一个apiservice网关服务对外提供HTTP接口,后面还有一个rand随机数服务,对内提供随机数TCP接口。

    客户端访问apiservice接口,apiservice连接randservice服务获取数据并返回。如果不做多路复用的话,apiservice和randservice之间的连接数就是客户端请求数,这样apiservice和randservice之间连接过多会导致性能问题。

                   n link                   n link
    +-----------+          +-------------+           +---------------+
    |           <---------->             <----------->               |
    |  client   <---------->  apiservice <----------->  randservice  |
    |           <---------->             <----------->               |
    +-----------+          +-------------+           +---------------+
    
    

    经过多路复用后,apiservice和randservice之间只有一个连接,这样无论多少个客户端请求都不会导致连接过多问题。

                   n link                   1 link
    +-----------+          +-------------+           +---------------+
    |           <---------->             |           |               |
    |  client   <---------->  apiservice <----------->  randservice  |
    |           <---------->             |           |               |
    +-----------+          +-------------+           +---------------+
    
    

    (当然这只是个示例场景而已,生产中apiservice和randservice之间使用RPC框架即可,不用我们手动写socket通信)

    代码示例

    1.随机数服务 randservice.go

    package main
    
    import (
        "bytes"
        "encoding/binary"
        "fmt"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
        "github.com/xtaci/smux"
        "math/rand"
        "net"
        "runtime"
        "time"
    )
    
    func init() {
        rand.Seed(time.Now().UnixNano())
    }
    
    /**
    一个生成随机数的tcp服务
    客户端发送'R', 'A', 'N', 'D',服务返回一个随机数
    */
    func main() {
        listener, err := net.Listen("tcp", ":9000")
        if err != nil {
            panic(err)
        }
        log.Info().Msg("随机数服务启动,监听9000端口")
        defer listener.Close()
        for {
            conn, err := listener.Accept()
            if err != nil {
                fmt.Println(err.Error())
                continue
            }
            go SessionHandler(conn)
        }
    }
    
    /**
    处理会话
    每个tcp连接生成一个会话session
    */
    func SessionHandler(conn net.Conn) {
        session, err := smux.Server(conn, nil)
        if err != nil {
            panic(err)
        }
        log.Info().Msgf("收到客户端连接,创建新会话,对端地址:%s", session.RemoteAddr().String())
    
        for !session.IsClosed() {
            stream, err := session.AcceptStream()
            if err != nil {
                fmt.Println(err.Error())
                break
            }
            go StreamHandler(stream)
        }
        log.Info().Msgf("客户端连接断开,销毁会话,对端地址:%s", session.RemoteAddr().String())
    }
    
    /**
    流数据处理
    */
    func StreamHandler(stream *smux.Stream) {
        buffer := make([]byte, 1024)
        n, err := stream.Read(buffer)
        if err != nil {
            log.Error().Msgf("流id:%d,异常信息:%s", stream.ID(), err.Error())
            stream.Close()
            return
        }
        cmd := buffer[:n]
        if bytes.Equal(cmd, []byte{'R', 'A', 'N', 'D'}) {
            rand := rand.Uint64()
            response := make([]byte, 8)
            binary.BigEndian.PutUint64(response, rand)
            stream.Write(response)
            log.Debug().Msgf("收到客户端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)
        } else {
            log.Warn().Msgf("收到未知请求命令,流id:%d,请求命令:%v", stream.ID(), cmd)
        }
    }
    

    2.api接口服务 apiservice.go

    package main
    
    import (
        "encoding/binary"
        "fmt"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
        "github.com/xtaci/smux"
        "net"
        "net/http"
        "runtime"
    )
    
    /**
    随机数服务客户端连接
    */
    var randClient *smux.Session
    
    func init() {
        //连接后端随机数服务
        conn, err := net.Dial("tcp", ":9000")
        if err != nil {
            log.Warn().Msg("随机数服务未启动")
            panic(err)
        }
        session, err := smux.Client(conn, nil)
        if err != nil {
            log.Error().Msg("打开会话失败")
            panic(err)
        }
        randClient = session
    }
    
    /**
    一个api网关,对外提供api接口
    调用随机数服务来获取随机数
    */
    func main() {
        defer randClient.Close()
        http.HandleFunc("/rand", RandHandler)
        http.ListenAndServe(":8080", nil)
    }
    
    /**
    随机数接口
    */
    func RandHandler(w http.ResponseWriter, r *http.Request) {
        stream, err := randClient.OpenStream()
        if err != nil {
            w.WriteHeader(500)
            fmt.Fprint(w, err.Error())
        } else {
            log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID())
            defer stream.Close()
            stream.Write([]byte{'R', 'A', 'N', 'D'})
            buffer := make([]byte, 1024)
            n, err := stream.Read(buffer)
            if err != nil {
                w.WriteHeader(500)
                fmt.Fprint(w, err.Error())
            } else {
                response := buffer[:n]
                var rand = binary.BigEndian.Uint64(response)
                log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)
                fmt.Fprintf(w, "%d", rand)
            }
        }
    }
    

    原理分析

    smux将socket连接封装成session,每次请求响应封装成一个stream,通过自定义协议发送数据

    VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)  
    
    VALUES FOR LATEST VERSION:
    VERSION:
        1/2
        
    CMD:
        cmdSYN(0)
        cmdFIN(1)
        cmdPSH(2)
        cmdNOP(3)
        cmdUPD(4)   // only supported on version 2
        
    STREAMID:
        client use odd numbers starts from 1
        server use even numbers starts from 0
        
    cmdUPD:
        | CONSUMED(4B) | WINDOW(4B) |
    

    比如我们发送的RAND命令封装成以下数据包发送给服务端,假设请求的STREAMID为11223344

    VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | RAND
    
    VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | 0102030405060708
    

    扩展优化

    但是这样又导致了另一个问题,由于apiservice和randservice之间只有一个连接,而这一个连接只能由一个goroutine处理,这样就导致性能低下
    所以进一步扩展apiservice和randservice之间建立固定数量的连接,如10个连接,用来处理所有的请求,就是通过连接池的方式来性能最大化

    改造后的示意图如下:

                   n link                  10 link
    +-----------+          +-------------+           +---------------+
    |           <---------->             <----------->               |
    |  client   <---------->  apiservice <----------->  randservice  |
    |           <---------->             <----------->               |
    +-----------+          +-------------+           +---------------+
    

    连接池版代码 apiservicewithpool.go

    package main
    
    import (
        "context"
        "encoding/binary"
        "fmt"
        cpool "github.com/jolestar/go-commons-pool/v2"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
        "github.com/xtaci/smux"
        "net"
        "net/http"
        "runtime"
    )
    
    var commonPool *cpool.ObjectPool
    var ctx = context.Background()
    
    func init() {
        factory := cpool.NewPooledObjectFactorySimple(NewSessionCpool)
    
        commonPool = cpool.NewObjectPoolWithDefaultConfig(ctx, factory)
        commonPool.Config.MaxTotal = 10
    }
    
    /**
    连接池生成新会话函数
    */
    func NewSessionCpool(ctx context.Context) (interface{}, error) {
        log.Debug().Msg("连接池中生成一个连接")
        //连接后端随机数服务
        conn, err := net.Dial("tcp", ":9000")
        if err != nil {
            log.Warn().Msg("随机数服务未启动")
            panic(err)
        }
        //随机数服务客户端连接
        session, err := smux.Client(conn, nil)
        if err != nil {
            log.Error().Msg("打开会话失败")
            panic(err)
        }
        return session, err
    }
    
    /**
    一个api网关,对外提供api接口
    调用随机数服务来获取随机数
    
    通过sync.Pool实现“连接池” !!! 不推荐这种方式,sync.Pool的种种特性不适合作为连接池
    */
    func main() {
        http.HandleFunc("/rand", CommonPoolRandHandler)
        http.ListenAndServe(":8080", nil)
    }
    
    /**
    随机数接口
    */
    func CommonPoolRandHandler(w http.ResponseWriter, r *http.Request) {
        obj, err := commonPool.BorrowObject(ctx)
        if err != nil {
            w.WriteHeader(500)
            fmt.Fprint(w, err.Error())
            return
        }
        client := obj.(*smux.Session)
        stream, err := client.OpenStream()
        if err != nil {
            w.WriteHeader(500)
            fmt.Fprint(w, err.Error())
        } else {
            log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID())
            defer stream.Close()
            stream.Write([]byte{'R', 'A', 'N', 'D'})
            buffer := make([]byte, 1024)
            n, err := stream.Read(buffer)
            if err != nil {
                w.WriteHeader(500)
                fmt.Fprint(w, err.Error())
            } else {
                response := buffer[:n]
                var rand = binary.BigEndian.Uint64(response)
                log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)
                fmt.Fprintf(w, "%d", rand)
            }
        }
        commonPool.ReturnObject(ctx, obj)
    }
    

    经过连接池改造后的模型就像MySQL或Redis的使用场景,每次请求相当于一个stream,多个stream共用一个session,一个session背后有一个socket连接,程序和MySQL或Redis之间创建多个session放入连接池中,每次请求从连接池中拿出session进行读写操作

    相关文章

      网友评论

        本文标题:golang socket连接复用 - smux

        本文链接:https://www.haomeiwen.com/subject/jpdltktx.html