简介
cmux is a generic Go library to multiplex connections based on their payload. Using cmux, you can serve gRPC, SSH, HTTPS, HTTP, Go RPC, and pretty much any other protocol on the same TCP listener.
当你需要开发一个同时兼容多通信协议应用的时候,为了让不同的通讯协议框架能够复用相同的业务逻辑层,则需要进行千方百计的封装,此时总会思考一个问题: 为什么没有办法使用一个端口就可以复用多个协议呢?
cmux
就是为了解决这个问题而诞生的,它可以仅监听一个端口,然后根据注册的 协议匹配器(matcher) 找到对应协议的通信框架,我们可以通过一个简单的例子查看。
// 创建监听端口的 Listener
l, err := net.Listen("tcp", ":23456")
if err != nil {
log.Fatal(err)
}
// 根据创建成功的 Listener 初始化 cmux 实例
m := cmux.New(l)
// 注册不同协议的 *协议匹配器*
grpcL := m.Match(cmux.HTTP2HeaderField("context-type", "application/grpc")) // 使用 context-type 标识为 grpc 协议
httpL := m.Match(cmux.HTTP1Fast()) // 使用 cmux 内置函数标识 http1 协议
trpcL := m.Match(cmux.Any()) // 使用 Any 函数匹配未匹配的任意协议
// 初始化不同协议的服务实例
grpcS := grpc.NewServer()
grpchello.RegisterGreeterServer(grpcs, &server{})
httpS := http.Server{
Handler: &helloHTTP1Handler{}
}
trpcS := rpc.NewServer()
s.Register(&ExampleRPCRcvr{})
// 启动所有注册协议的服务
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
go trpcS.Serve(trpcL)
m.Serve() // 启动 cmux 服务实例
在该例子中,应用仅监听了 23456 端口,却分别注册了 gRPC
、HTTP
、Thrift
等 3 种协议的服务端,并且使用了对应的 3 种不同 匹配器 帮助 cmux
将请求按照规则分配到不同的协议服务中。
因此,我们只要将业务逻辑使用适配器模式封装好,当需要接入不同协议的时候,仅需编写协议适配层即可复用一致的业务代码。
源码解析
cmux 的核心主要分为 多路复用器 和 匹配器 的实现,要最简单的方法就是通过源码一探究竟。
cmux.go
框架中最核心的当属与包名相同的 CMux
接口,该接口定义了一个基于匹配模式的多路复用器行为。
// Matcher 根据连接的内容返回匹配的结果
type Matcher func(io.Reader) bool
// MatchWriter 根据连接的内容返回匹配的结果,且如果匹配成功后对响应进行写入
type MatchWriter func(io.Writer, io.Reader) bool
// ErrorHandler 处理一个错误并返回一个布尔值告诉多路复用器是否应该继续运行
type ErrorHandler func(error) bool
// Cmux 是一个网络连接的多路复用器
type Cmux interface {
// Match 返回一个 net.Listener 用于查看(接受)至少匹配中一个匹配器的网络连接
//
// 根据传入的顺序决定匹配器的优先级
Match(...Matcher) net.Listener
// MatchWithWriters 返回一个 net.Listener 用于接受至少匹配中一个匹配器的网络连接
//
// MatchWriters 可以在实际处理程序之前在连接上进行写操作。
//
// 根据传入的顺序决定匹配器的优先级
MatchWithWriters(...MatchWriter) net.Listener
// Serve 开始 net.Listener 的多路复用,该方法会阻塞且应该使用 goroutine 并发调用
Serve() error
// HandleError 注册一个错误回调函数用于处理监听器发生错误
HandleError(ErrorHandler)
// SetReadTimeout 设置读取匹配器列表的超时时间
SetReadTimeout(time.Duration)
}
结合 Cmux
接口的定义以及简介中的例子,我们可以大概想象出这个多路复用器的简单实现方式应该就是通过接受一个连接,然后通过遍历注册在多路复用器中的匹配器列表,找到对应的处理服务,然后调用这个服务的回调函数进行响应,伪代码大致如下所示:
for {
conn, _ := l.Accept() // 接受一个连接
for _, matcher := range matchers { // 遍历匹配器列表
if matched := matcher(conn); matched { // 如果找到匹配器
go matcher.handle(conn) // 启动 goroutine 处理连接
}
}
}
脑海中有了雏形之后,接下来就看看 cMux
的具体实现,这个类型实现了 Cmux
接口,并作为框架的默认实现。
type cMux struct {
root net.Listener
bufLen int // 匹配器中缓存连接的队列长度
errh ErrorHandler // 注册的错误处理函数
donec chan struct{} // 表示多路复用器是否应该关闭
sls []matchersListener // 注册的匹配器列表
readTimeout time.Duration // 表示读取匹配器列表的超时时间
}
// New 根据传入的网络监听器实例化一个「连接多路复用器」
func New(l net.Listener) CMux {
return &cMux{
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
readTimeout: noTimeout,
}
}
// Match 简化函数,会对 Matcher 进行转换为 MatchWriter 并调用 MatchWithWriters
func (m *cMux) Match(matchers ...Matcher) net.Listener { /* snip... */ }
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
// 为传入的匹配器列表初始化一个网络连接包装器,该包装器实现了 net.Listener 接口
// 用于返回给与匹配器对应的服务端进行连接的 「获取」、「处理」、「关闭」等操作
ml := muxListener{
Listener: m.root,
connc: make(chan net.Conn, m.bufLen),
}
// 将传入的匹配器列表打包到 cmux 中
m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
return ml
}
func (m *cMux) SetReadTimeout(h ErrorHandler) { /* snip */ }
func (m *cMux) HandleError(h ErrorHandler) { /* snip */ }
func (m *cMux) Serve() error {
defer func() { /* 关闭 cmux 实例,并调用所有已注册的匹配器 Close */ }
for {
c, err := m.root.Accept()
// handle error
wg.Add(1)
go m.serve(c, m.donec, &wg)
}
}
func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
// 将 net.Conn 包装为 MuxConn 并提供对连接数据的透明嗅探
muc := newMuxConn(c)
// ...
for _, sl := range m.sls { // 遍历已注册的匹配器列表
for _, s := range sl.ss { // s => MatchWriter
// 根据连接的内容返回匹配结果,如匹配且 io.Writer 非空则
// 对 muc.Conn 进行写入
matched := s(muc.Conn, muc.startSniffing())
if matched {
// ...
select {
// 将匹配成功的连接放入匹配器的缓存队列中,结束
case sl.l.connc <- muc:
// 如果多路复用器标识为终止,则关闭连接,结束
case <-donec:
_ = c.Close()
}
return
}
}
}
// 如果执行到这里,意味这个连接没有被任何已注册的匹配器所匹配成功
// 这里会将 ErrNotMatched 这个错误交给多路复用器的 「错误处理函数」
_ = c.Close()
err := ErrNotMatched{c: c}
if !m.handleErr(err) {
// ...
}
cMux
的核心逻辑在于方法 MatchWithWriters
和 Serve
都已经给出关键注释,与我们之前猜想的伪代码高度相似,至此,我们已经掌握了 cmux 库的核心代码实现。
matchers.go
在 cmux 中,匹配器(matcher) 是核心组件的一员,默认提供了 HTTP1.x / HTTP2.0 的一些包装函数,方便开发者通过简单的调用创建不同协议的匹配器。
在阅读该小节之前,你应该对 HTTP 协议有简单的认识
HTTP 1.x
在样例代码中, HTTP 服务端使用了 HTTP1Fast()
匹配器,我们可以通过该函数了解如何去实现一个简单的匹配器。
// HTTP 1.1 中 8 个合法的 HTTP Method
var defaultHTTPMethods = []string{
"OPTIONS",
"GET",
"HEAD",
"POST",
"PUT",
"DELETE",
"TRACE",
"CONNECT",
}
// HTTP1Fast 仅与 HTTP 请求中的方法进行匹配
//
// 这是一个乐观匹配器:即使匹配器返回 true ,也不意味这就是一个合法的 HTTP 请求
// 如果你需要的是一个能够匹配正确 HTTP1 协议的匹配器,请使用 HTTP1 代替,但会因此
// 损失部分性能
func HTTP1Fast(extMethods ...string) Matcher {
return PrefixMatcher(append(defaultHTTPMethods, extMethods...)...)
}
// PrefixMatcher 返回能够检测连接中字符串前缀是否一致的匹配器
func PrefixMatcher(strs ...string) Matcher {
pt := newPatriciaTreeString(strs...) // 根据字符串列表创建一个前缀树
return pt.matchPrefix
}
在 HTTP1Fast
中,直接了当地通过判断连接内容中的开头是否存在合法的 HTTP Method 字符串,如果找到,返回 true ,让初始化该匹配器的服务端进行下一步的请求处理。
接下来,我们看看能够匹配合法 HTTP1.x 的匹配器源码实现。
func HTTP1() Matcher {
return func(r io.Reader) bool {
// 读取连接的请求内容,最大读取长度为 4096
br := bufio.NewReader(&io.LimitedReader{R: r, N: maxHTTPRead})
l, part, err := br.ReadLine() // 尝试读取请求内容的第一行字符
if err != nil || part {
// 如果读取发生错误或者第一行长度大于 4096,返回 false
return false
}
// 将第一行内容尝试解析出 HTTP1.x 协议相关信息
// parseRequestLine(line) (method, uri, proto, ok)
_, _, proto, ok := parseRequestLine(string(l))
if !ok {
return false
}
// 判断请求的 HTTP 协议版本是否为 1.x
// 如果是则返回 true,反之返回 false
v, _, ok := http.ParseHTTPVersion(proto)
return ok && v == 1
}
}
与 HTTP1Fast 完全不一样, HTTP1 匹配器是通过读取 HTTP 头的协议相关信息,并在正确解析出后判断版本号是否合法,如果你的需求是希望响应正确的 HTTP 请求,并且不在意性能损耗,请使用 HTTP1 匹配器。
此外,cmux 还提供了 HTTP1HeaderField
和 HTTP1HeaderFieldPrefix
匹配器用于对 HTTP1 的请求头进行匹配。
HTTP 2.0
HTTP 2.0 所令人熟知的一个特性就是: 多路复用 ,因此对于 HTTP 2.0 协议请求的匹配机制必然会发生变化。
// HTTP2 解析连接的第一个帧头,用于识别该连接是否为 HTTP2 协议
func HTTP2() Matcher {
return hasHTTP2Preface
}
func hasHTTP2Preface(r io.Reader) bool {
// 创建一个 http2.ClientPreface 大小的缓冲数组,用于判断连接的
// 第一帧是否是 http2 协议
var b [len(http2.ClientPreface)]byte
last := 0 // 游标
for {
n, err := r.Read(b[last:])
if err != nil {
return false
}
// 判断从连接读取到的内容是否与 http2.ClientPreface 一致,
// 如一致则为 http2 协议,反之为非 http2 协议
last += n
eq := string(b[:last]) == http2.ClientPreface[:last]
if last == len(http2.ClientPreface) {
return eq
}
if !eq {
return false
}
}
}
cmux 同时也提供了与 HTTP2HeaderField
与 HTTP2HeaderFieldPrefix
匹配器。
延伸
限制性
使用 cmux
可以大大降低开发及维护重复业务代码的精力,但是也有其局限性
TLS
在 net/http
中通过 类型断言 判断是否为 TLS 连接,但由于 cmux 会对连接进行一次包装(MuxConn
),导致标准库无法正确断言出 TLS 连接,因此你可以使用 cmux 提供 HTTPS 服务,但是却无法在处理函数(Handler)中获取 http.Request.TLS
。
相同连接多协议
通过源码解析我们也大概可以了解到, cmux 是在接受连接的时候进行匹配的,如果遇到相同连接同时使用多种协议,cmux 就无法处理了。
不能与 fasthttp 框架结合
因为 cmux 与 fasthttp 框架同时实现了 会话层 ,而且 fasthttp (非 wrapper)针对性能优化对大量的接口进行了修改 ,所以无法通过直接的嵌套同时获得 多协议支持 和 高性能 ,但是我们可以参考 fasthttp 的思想对 cmux 进行性能优化,如:协程池等。
gRPC 服务无响应
如果你是 1.17 及之前的版本 grpc-go 升级到新版本的 grpc-go ,你可能会发现客户端无法发起调用并返回如下的错误描述:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: timed out waiting for server handshake
这是 gRPC 升级带来的兼容性问题,在新版本中,grpc 客户端在进行发送请求之前必须要与服务端进行健康检测,否则客户端的请求将会被阻塞。
这个特性是为了防止 grpc 客户端以一种 self-DoS 的方式导致自身出现问题,具体的描述可以参见 grpc/grpc-go/#2406 以及 grpc/grpc#17006 。
为了解决这个问题,需要将 cmux 升级到最新版本,并将 grpc 的匹配器修改为:
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
网友评论