TCP异步框架
Golang 编程风格
- Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。
掌握Go语言,要把握一个中心,两个基本点。
- 一个中心是Go语言并发模型,即不要通过共享内存来通信,要通过通信来共享内存;
-
两个基本点是Go语言的并发模型的两大基石:
channel
和go-routine
。
不要通过共享内存来通信,要通过通信来共享内存
这句话的大概解释是: 不要通过共享内存来实现通信,这是因为在复杂的分布式、多线程和多进程之间通过加锁等控制并发方式来保证数据的正确性,是非常困难和低效的。建议线程之间通过通道channel
来实现通知,降低数据的竞争,提高系统的可靠性和正确性。
1. 服务启动开始
1.1 启动心跳定时器循环
func (s *Server) timeOutLoop() {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case timeout := <-s.timing.TimeOutChannel():
netID := timeout.Ctx.Value(netIDCtx).(int64)
if v, ok := s.conns.Load(netID); ok {
sc := v.(*ServerConn)
sc.timerCh <- timeout
} else {
holmes.Warnf("invalid client %d", netID)
}
}
}
}
当服务开始的时候就开始了定时器循环timeOutLoop
来维护Clinet
连接服务的应用层心跳,在一个goroutine
中通过select
一直监控服务中名为timeOutChan
定时任务的channel
,
如果有定时任务到来,通过context
上下文获取netIDCtx
,这是TCP连接唯一标识ID,根据这个ID我们可以找到相应的ServerConn
(ServerConn:这是对于TCP连接,上层又一次的连接封装。其中主要包含三个重要的channel
,分别是sendCh
,handlerCh
和timerCh
,下面会详细介绍)。
这样就可以把定时到期任务放到相应ServerConn
的timeCh
中了,由该连接处理定时到期任务的执行。
1.2 服务启动限制处理
- 如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。
- 如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。
2. 网络连接处理模块
func (sc *ServerConn) Start() {
holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
onConnect := sc.belong.opts.onConnect
if onConnect != nil {
onConnect(sc)
}
loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
for _, l := range loopers {
looper := l
sc.wg.Add(1)
go looper(sc, sc.wg)
}
}
在别的编程语言中,采用Reactor
模式编写的服务器往往需要在一个IO线程异步地通过epoll
进行多路复用。而因为Go线程的开销廉价,Go语言可以对每一个网络连接创建三个goroutine。
-
readLoop()
负责读取数据并反序列化成消息。 -
writeLoop()
负责序列化消息并发送二进制字节流。 -
handleLoop()
负责调用消息处理函数。
这三个协程在连接创建并启动时就会各自独立运行。
2.1 ReadLoop 实现细节
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
default:
msg, err = codec.Decode(rawConn)
if err != nil {
holmes.Errorf("error decoding message %v\n", err)
if _, ok := err.(ErrUndefined); ok {
// update heart beats
setHeartBeatFunc(time.Now().UnixNano())
continue
}
return
}
setHeartBeatFunc(time.Now().UnixNano())
handler := GetHandlerFunc(msg.MessageNumber())
if handler == nil {
if onMessage != nil {
holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
onMessage(msg, c.(WriteCloser))
} else {
holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
}
continue
}
handlerCh <- MessageHandler{msg, handler}
}
}
在Readloop 循环中通过codec
来读取网络rawConn
连接中数据包,并且返回的是解析后的数据。
而codec
使用的解析函数是在服务启动的时候注册的,注册的还有该类型数据的执行函数,以消息类型为key
保存在message.go
包中。
解析成功后再获取该消息的执行函数,二者封装成MessageHandler
发送到handlerCh
中。供HandleLoop
循环执行。
2.2 HandleLoop 实现细节
for {
select {
case <-cDone: // connectin closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case msgHandler := <-handlerCh:
msg, handler := msgHandler.message, msgHandler.handler
if handler != nil {
if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
})
if err != nil {
holmes.Errorln(err)
}
addTotalHandle()
} else {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
}
}
case timeout := <-timerCh:
if timeout != nil {
timeoutNetID := NetIDFromContext(timeout.Ctx)
if timeoutNetID != netID {
holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
}
if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
timeout.Callback(time.Now(), c.(WriteCloser))
})
if err != nil {
holmes.Errorln(err)
}
} else {
timeout.Callback(time.Now(), c.(WriteCloser))
}
}
}
}
在HandleLoop
循环中,主要监听handlerCh
和timerCh
,一个是消息执行channel
,一个是定时任务到期channel
。
-
handlerCh
处理都是ReadLoop循环中发送过来的数据,通过异步任务池来执行任务。 -
timerCh
处理的该连接下定时任务的执行,也是通过异步任务池来执行任务。
2.2 WriteLoop 实现细节
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case pkt = <-sendCh:
if pkt != nil {
if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err)
return
}
}
}
}
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
msg := tao.MessageFromContext(ctx).(Message)
holmes.Infof("receving message %s\n", msg.Content)
conn.Write(msg)
}
在WriteLoop
循环中,主要监听sendCh
,它会非阻塞地将sendCh中的消息全部发送完毕再退出,避免漏发消息。
而sendCh
消息的传入是在服务开始的时候message
注册的,在ProcessMessage
中通过Write
异步写入到 sendCh
中。
3. 总结
在Tao
框架中三大循环ReadLoop
、HandleLoop
和WriteLoop
是整个的核心代码,这三个Loop
中是通过channel
来实现数据的传递,而每一个TCP连接都会实现这三个goroutine
。每一个goroutine
都是独立运行。
-
框架支持通过tao.TLSCredsOption()函数提供传输层安全的TLS Server
-
而在在我们开发不同的业务中,编写业务代码是在自定义
message
当中。需要实现DeserializeMessage
解析该类型数据包函数和ProcessMessage
只用该消息的函数。 -
在框架中使用
context
联系程序上线文,使得程序能够优雅的退出。 -
Context
的使用在另一篇文章中Golang并发模型。 -
至于在框架中定时器的实现分析在另一篇文章中 Golang-基于TimeingWheel定时器。
4. 感谢
感谢leesper
为开源社区做出的贡献,提供我们学习。
网友评论