美文网首页
GO实现千万级WebSocket消息推送服务技术分析

GO实现千万级WebSocket消息推送服务技术分析

作者: yichen_china | 来源:发表于2021-01-22 13:26 被阅读0次

    慕课网地址:https://www.imooc.com/learn/1025

    原文地址:https://blog.csdn.net/Wing_93/article/details/81587809

    拉模式和推模式区别

    拉模式(定时轮询访问接口获取数据)

    • 数据更新频率低,则大多数的数据请求时无效的
    • 在线用户数量多,则服务端的查询负载很高
    • 定时轮询拉取,无法满足时效性要求

    推模式(向客户端进行数据的推送)

    • 仅在数据更新时,才有推送
    • 需要维护大量的在线长连接
    • 数据更新后,可以立即推送

    基于WebSocket协议做推送

    • 浏览器支持的socket编程,轻松维持服务端的长连接
    • 基于TCP协议之上的高层协议,无需开发者关心通讯细节
    • 提供了高度抽象的编程接口,业务开发成本较低

    WebSocket协议的交互流程

    image

    客户端首先发起一个Http请求到服务端,请求的特殊之处,在于在请求里面带了一个upgrade的字段,告诉服务端,我想生成一个websocket的协议,服务端收到请求后,会给客户端一个握手的确认,返回一个switching, 意思允许客户端向websocket协议转换,完成这个协商之后,客户端与服务端之间的底层TCP协议是没有中断的,接下来,客户端可以向服务端发起一个基于websocket协议的消息,服务端也可以主动向客户端发起websocket协议的消息,websocket协议里面通讯的单位就叫message。

    传输协议原理

    • 协议升级后,继续复用Http协议的底层socket完成后续通讯
    • message底层会被切分成多个frame帧进行传输,从协议层面不能传输一个大包,只能切成一个个小包传输
    • 编程时,只需操作message,无需关心frame(属于协议和类库自身去操作的)
    • 框架底层完成TCP网络I/O,WebSocket协议的解析,开发者无需关心

    服务端技术选型与考虑

    NodeJs

    • 单线程模型(尽管可以多进程),推送性能有限

    C/C++

    • TCP通讯、WebSocket协议实现成本高

    Go

    • 多线程,基于协程模型并发
    • Go语言属于编译型语言,运行速度并不慢
    • 成熟的WebSocket标准库,无需造轮子

    基于Go实现WebSocket服务端

    用Go语言对WebSocket做一个简单的服务端实现,以及HTML页面进行调试,并对WebSocket封装,这里就直接给出代码了。

    WebSocket服务端

    
    package main
     
    import (
            "net/http"
            "github.com/gorilla/websocket"
            "github.com/myproject/gowebsocket/impl"
            "time"
            )
    var(
        upgrader = websocket.Upgrader{
            // 允许跨域
            CheckOrigin:func(r *http.Request) bool{
                return true
            },
        }
    )
     
    func wsHandler(w http.ResponseWriter , r *http.Request){
        //  w.Write([]byte("hello"))
        var(
            wsConn *websocket.Conn
            err error
            conn *impl.Connection
            data []byte
        )
        // 完成ws协议的握手操作
        // Upgrade:websocket
        if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{
            return 
        }
     
        if conn , err = impl.InitConnection(wsConn); err != nil{
            goto ERR
        }
     
        // 启动线程,不断发消息
        go func(){
            var (err error)
            for{
                if err = conn.WriteMessage([]byte("heartbeat"));err != nil{
                    return 
                }
                time.Sleep(1*time.Second)
            }
        }()
     
        for {
            if data , err = conn.ReadMessage();err != nil{
                goto ERR
            }
            if err = conn.WriteMessage(data);err !=nil{
                goto ERR
            }
        }
     
        ERR:
            conn.Close()
     
    }
     
    func main(){
     
        http.HandleFunc("/ws",wsHandler)
        http.ListenAndServe("0.0.0.0:7777",nil)
    }
    

    前端页面

    <!DOCTYPE html>
    <html>
    <head>
        <title>go websocket</title>
        <meta charset="utf-8" />  
    </head>
    <body>
        <script type="text/javascript">
            var wsUri ="ws://127.0.0.1:7777/ws"; 
            var output;  
            
            function init() { 
                output = document.getElementById("output"); 
                testWebSocket(); 
            }  
         
            function testWebSocket() { 
                websocket = new WebSocket(wsUri); 
                websocket.onopen = function(evt) { 
                    onOpen(evt) 
                }; 
                websocket.onclose = function(evt) { 
                    onClose(evt) 
                }; 
                websocket.onmessage = function(evt) { 
                    onMessage(evt) 
                }; 
                websocket.onerror = function(evt) { 
                    onError(evt) 
                }; 
            }  
         
            function onOpen(evt) { 
                writeToScreen("CONNECTED"); 
               // doSend("WebSocket rocks"); 
            }  
         
            function onClose(evt) { 
                writeToScreen("DISCONNECTED"); 
            }  
         
            function onMessage(evt) { 
                writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>'); 
               // websocket.close(); 
            }  
         
            function onError(evt) { 
                writeToScreen('<span style="color: red;">ERROR:</span> '+ evt.data); 
            }  
         
            function doSend(message) { 
                writeToScreen("SENT: " + message);  
                websocket.send(message); 
            }  
         
            function writeToScreen(message) { 
                var pre = document.createElement("p"); 
                pre.style.wordWrap = "break-word"; 
                pre.innerHTML = message; 
                output.appendChild(pre); 
            }  
         
            window.addEventListener("load", init, false);  
            function sendBtnClick(){
                var msg = document.getElementById("input").value;
                doSend(msg);
                document.getElementById("input").value = '';
            }
            function closeBtnClick(){
                websocket.close(); 
            }
        </script>
        <h2>WebSocket Test</h2>  
        <input type="text" id="input"></input>
        <button onclick="sendBtnClick()" >send</button>
        <button onclick="closeBtnClick()" >close</button>
        <div id="output"></div>     
        
    </body>
    </html>
    

    封装WebSocket

    package impl
     
    import (
            "github.com/gorilla/websocket"
            "sync"
            "errors"
            )
     
    type Connection struct{
        wsConnect *websocket.Conn
        inChan chan []byte
        outChan chan []byte
        closeChan chan byte
     
        mutex sync.Mutex  // 对closeChan关闭上锁
        isClosed bool  // 防止closeChan被关闭多次
    }
     
    func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
        conn = &Connection{
            wsConnect:wsConn,
            inChan: make(chan []byte,1000),
            outChan: make(chan []byte,1000),
            closeChan: make(chan byte,1),
     
        }
        // 启动读协程
        go conn.readLoop();
        // 启动写协程
        go conn.writeLoop();
        return
    }
     
    func (conn *Connection)ReadMessage()(data []byte , err error){
        
        select{
        case data = <- conn.inChan:
        case <- conn.closeChan:
            err = errors.New("connection is closeed")
        }
        return 
    }
     
    func (conn *Connection)WriteMessage(data []byte)(err error){
        
        select{
        case conn.outChan <- data:
        case <- conn.closeChan:
            err = errors.New("connection is closeed")
        }
        return 
    }
     
    func (conn *Connection)Close(){
        // 线程安全,可多次调用
        conn.wsConnect.Close()
        // 利用标记,让closeChan只关闭一次
        conn.mutex.Lock()
        if !conn.isClosed {
            close(conn.closeChan)
            conn.isClosed = true 
        }
        conn.mutex.Unlock()
    }
     
    // 内部实现
    func (conn *Connection)readLoop(){
        var(
            data []byte
            err error
            )
        for{
            if _, data , err = conn.wsConnect.ReadMessage(); err != nil{
                goto ERR
            }
    //阻塞在这里,等待inChan有空闲位置
            select{
                case conn.inChan <- data:
                case <- conn.closeChan:     // closeChan 感知 conn断开
                    goto ERR
            }
            
        }
     
        ERR:
            conn.Close()
    }
     
    func (conn *Connection)writeLoop(){
        var(
            data []byte
            err error
            )
     
        for{
            select{
                case data= <- conn.outChan:
                case <- conn.closeChan:
                    goto ERR
            }
            if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{
                goto ERR
            }
        }
     
        ERR:
            conn.Close()
     
    }
    

    千万级弹幕系统的架构设计

    技术难点

    内核瓶颈

    • 推送量大:100W在线 * 10条/每秒 = 1000W条/秒
    • 内核瓶颈:linux内核发送TCP的极限包频 ≈ 100W/秒

    锁瓶颈

    • 需要维护在线用户集合(100W用户在线),通常是一个字典结构
    • 推送消息即遍历整个集合,顺序发送消息,耗时极长
    • 推送期间,客户端仍旧正常的上下线,集合面临不停的修改,修改需要遍历,所以集合需要上锁

    CPU瓶颈

    • 浏览器与服务端之间一般采用的是JSon格式去通讯
    • Json编码非常耗费CPU资源
    • 向100W在线推送一次,则需100W次Json Encode

    优化方案

    内核瓶颈

    • 减少网络小包的发送,我们将网络上几百字节定义成网络的小包了,小包的问题是对内核和网络的中间设备造成处理的压力。方案是将一秒内N条消息合并成1条消息,合并后,每秒推送数等于在线连接数。

    锁瓶颈

    • 大锁拆小锁,将长连接打散到多个集合中去,每个集合都有自己的锁,多线程并发推送集合,线程之间推送的集合不同,所以没有锁的竞争关系,避免锁竞争。
    • 读写锁取代互斥锁,多个推送任务可以并发遍历相同集合

    CPU瓶颈

    • 减少重复计算,Json编码前置,1次消息编码+100W次推送,消息合并前置,N条消息合并后,只需要编码一次。

    单机架构

    image

    最外层是在线的长连接,连接到服务端后,打散到多个集合里面存储,我们要发送的消息呢,通过打包后,经过json编码,被多个线程或协程分发到多个集合中去,最终推给了所有的在线连接。

    单机瓶颈

    • 维护海量长连接,会花费不少内存
    • 消息推送的瞬时,消耗大量的CPU
    • 消息推送的瞬时带宽高达400-600Mb(4-6Gbits),需要用到万兆网卡,是主要瓶颈

    集群

    部署多个节点,通过负载均衡,把连接打散到多个 服务器上,但推送消息的时候,不知道哪个直播间在哪个节点上,最常用的方式是将消息广播给所有的网关节点,此时就需要做一个逻辑集群。

    逻辑集群

    • 基于Http2协议向gateway集群分发消息(Http2支持连接复用,用作RPC性能更佳,即在单个连接上可以做高吞吐的请求应答处理)
    • 基于Http1协议对外提供推送API(Http1更加普及,对业务方更加友好)

    整体分布式架构图如下:

    image

    任何业务方通过Http接口调用到逻辑集群,逻辑集群把消息广播给所有网关,各个网关各自将消息推送给在线的连接即可。

    本文讲解了开发消息推送服务的难点与解决方案的大体思路,按照整个理论流程下来,基本能实现一套弹幕消息推送的服务。

    相关文章

      网友评论

          本文标题:GO实现千万级WebSocket消息推送服务技术分析

          本文链接:https://www.haomeiwen.com/subject/zlzdzktx.html