http服务端是从*http.Server.ListenAndServe()开始的。Server数据结构定义了Handler字段,声明了http服务器如何处理(w,*r)
type Server struct {
Handler Handle
}
Handler接口定义:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
Handler可以自定义实现,或是由上层框架实现。服务端监听工作,已由net/http标准库完成。
首先是外层的监听函数Serve(l)用来监听tcp连接,有连接建立时accepted,开一个协程处理请求。服务器用一个线程处理多个客户端请求,是典型的异步应用场景。由于go语言支持协程,代替了这样的应用场景,go serve(ctx)简单的一行代码,就完成了复杂的异步调用完成的功能。并且业务逻辑不分节,不需要关心线程池的调参。
协程中执行的闭包,主要由以下三块逻辑:
- 从TCP连接的buf缓冲中读取请求字节流,并构建Request请求。
- 调用http.Server.Handler处理请求。
- fnishRequest刷新TCP连接的缓存,完成http响应。
Serve(l)获取连接
sync.Once
Java的两种实现
举一个例子,JDK7中的concurrentHashmap在构建segment时,会保证构建唯一的Segment。是通过CAS无锁操作实现的:volatile判空,线程中构建,CAS赋值。常规的方法是添加的全局的volatile bool,在构造函数的最后一行利用volatile禁止重排序的语义,标记构造已完成。并且要对构造语句的集合加锁,保证其原子性。
//解释java的思想,用go语言实现
var done bool = false
func setup(){
a = "hello"
done = true
}
func doprint(){
if !done{
setup()
}
print(a)
}
而go中也有类似的功能,sync.Once,通过Mutex实现。功能为保证闭包全局的唯一一次执行。
var once sync.Once
once.Do()
lintener
在TCP监听中,监听线程只有一个listener,用sync.Once包裹。第个Serve(l)方法只管理一个监听器。当接受到并建立连接之用,为每个连接单开一个协程处理。并调用trackListener方法,将其加入到Server.listeners。如果*Server正在执行关闭,则不进行添加。
Serve(l)方法结束之前,将连接移除。
s.shuttingDown()是一个标志位方法,如果将其置为true,所有新建连接、处理请求等操作,在执行之前都会检查这个标志位。类似于java的volatile并发读可见性语义。s.shutdown标志位采用了atomic.LoadInt32(&s.inShutdown)的方式确保其并发读一致性。
Context配置
由于go没有继承,常用第一形参来代替被继承的对象。所以这样go可以利用形参,实现“多继承”。总结golang的形参特性:
- 第一形参包含一个父类实例,代替继承
- 支持闭包传递,减少继承使用
- 引用传递修改结果,代替返回值。由于Go本身支持多返回值,这个特性效果不明显
- 支持不定参,切片打散传入
TCP服务端,通过向ctx加入“http-serve”:*Server的KV对来标注。
循还体内持续监听
调用l.Accept()阻塞等待监听,当有连接建立,将返回net.Conn实例。
检测doneChan通道,是一个结束监听的开关。使能之后遇到,处理完已连接请求之后,将立即结束监听。
TCP连接具有超时重试机制,逻辑为:
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
//未连接成功,等待5ms 10ms 20ms...1s不断重试。
总结未正确得到TCP连接的情况:
- doneChan开关使能,终止程序。
- 超时重试。
- 其它错误,终止程序
最后新服务包裹net.Conn代表的TCP连接 和 服务端实例*Server。并新建一个协程处理TCP连接的数据包请求。
由于并发线程非常有限,使用并发地方式,一个线程处理一个连接请求,会极大限制服务器处理连接的数量(个人测试,简单业务逻辑,16G内存最大约开4000个线程)。因而产生了异步式编程,只用一个监听,处理更大量的连接请求,由于请求是IO密集型操作,通过异步编程可以有效提高其并发度。具体执行实际仍交给线程池执行。不过异步编程的缺点也十分明显,程序逻辑要截成两段,拿到异步结果之后的处理逻辑还要再写一段,同时程序还需要用并发哈希临时保存未完成的请求,以及异步结果提前初始化好对象,等待接收。
有了协程和闭包,go语言的处理连接请求方法结构十分清晰,只有一行:
go c.serve(ctx)
在TCP层面,为每个连接单独创建协程处理。在读请求的完整包并作http协议层解析,会使用对象池,减少GC压力,提高处理并发度。
c.serve(ctx)监听连接
net.Conn装饰
连接的数据结构:
type conn struct {
server *Server
rwc net.Conn
remoteAddr string
tlsState *tls.ConnectionState
cancelCtx context.CancelFunc
werr error
r *connReader
bufr *bufio.Reader
bufw *bufio.Writer
lastMethod string
mu sync.Mutex
hijackedv bool
}
HTTP连接共有三块内容:-
- 上下层:上层Server服务端,下层的TCP连接 客户端地址 TLS信息
- 读写缓冲区
- 状态信息:最后http请求方式,当前请求,当前状态,是否被handler劫持。
注意到读缓冲区有两种数据结构 常规的bufio.Reader和net/http包实现的connReader:
type connReader struct {
conn *conn
mu sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond *sync.Cond
inRead bool
aborted bool // set true before conn.rwc deadline is set to past
remain int64 // bytes remaining
}
除了bufio.Reader缓冲区之外,net/http实现的connReader是net.Conn的包裹,为bufio.Reader提供缓冲数据。为何实现两个io.Reader?后面的内容会详述。
serve(ctx)方法,首先在ctx中保存客户端地址c.rwc.LocalAddr()。
ctx装饰类emptyCtx cancelCtx timerCtx
Context是一个可配置生命周期的,被多个协程并发访问的KV存储结构。可设置生命周期,或主动释放(可以被GC)。
其接口为:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
对于连接的ctx采用如下装饰,扩充其原有的功能
- emptyCtx int类型而非struct{},仅实现Context接口,供context.Background()调用创建空ctx。同样功能的还有context.Todo()
- valueCtx 用于ctx的PUT/GET操作
- cancelCtx,带有cancelFunc的ctx。
- timerCtx,具有超时机制的ctx。
先看ctx最基本的Put Get操作。
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
type valueCtx struct {
Context
key, val interface{}
}
ctx添加的KV值,直接存在继承类的新字段中。反射Type有Comparable()方法,可用来检查类型是否可以被用来作key。不同于哈希,加参数并不检查key是否已存在。
获取字段用递归查找其所有的KV。有一个即返回,不检查全部。
到这里看似这个数据结构和map[]功能并没有区别,实现却坡为复杂。单独写一个这样的数据结构,同样的KV对之间多了一层父子关系。也就是说参数是像压栈一样被压入的,当需要一个元素出栈,所有后压入的元素也会跟着出栈。由于KV对之间存在父子关系,所以允许相同的key存在。意义在于其concelFunc:
Calling the CancelFunc cancels the child and its children, removes the parent's reference to the child, and stops any associated timers.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
canceler接口的实现类就是*cancelCtx and *timerCtx。
创建方法:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
其中Canceled是个构建error对象,为打印的信息。方法返回cancelCtx和取消ctx的闭包。propagateCancel方法功能:向上面的parent找一个cancelCtx,如果这个cancelCtx已经取消,则当前ctx也取消。否则加入p.children[]添加关联,等待被取消。总之功能为,向上建立父子关系,这种关系是cancelCtx之间的。
cancelCtx.Done()功能:加锁创建cancelCtx.done通道
cancel.calcel()功能:关闭c.done通道,递归调用所有c.children.cancel(),释放c.children = nil
c.child的父子关系,并不是相邻的valueCtx继承关系,而是相邻的cancelCtx之间的关系。
再来看看timerCtx:
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
可以配置截止日期。使用了time.AfterFunc(dur, func() )到期后,自动清理。
AfterFunc waits for the duration to elapse and then calls f in its own goroutine. It returns a Timer that can be used to cancel the call using its Stop method.
最后利用以上代码,直接实现最常用的为ctx添加超时机制。
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
总结:这种数据结构为大量的KV配置信息,提供批量释放的功能。功能类似于etcd为一组相似的配置信息,配置相同的租约。最终使用GC更为高效,提高内存利用率。
connReader bufio.Reader bufio.Writer
c.bufw是池化的,且指定了c.bufw.buf的数组大小为4<<10。并且根据size大小,分为bufioWriter2kPool和bufioWriter4kPool。
bufio.Reader的最小池大小为4M字节。
读取并解析请求
完成以上配置之后循还获取数据,并解析为空的Response:
w, err := c.readRequest(ctx)
遵守RFC 7231 5.1.1作Expect校验
如果请求Header中有"Expect":"100-continue"则继续,否则关闭http连接。
For now we'll just obey RFC 7231 5.1.1 which says "A server that receives an Expect field-value other than 100-continue MAY respond with a 417 (Expectation Failed) status code to indicate that the unexpected expectation cannot be met."
结束请求的工作:
w.Header().Set("Connection", "close")
w.WriteHeader(StatusExpectationFailed) //返回417状态码
w.finishRequest() //刷新缓存相关操作
注意到req.Body是包裹了Response的。
为req.Body添加hitEof的后续处理函数
先上body数据结构:
type body struct {
src io.Reader
hdr interface{} // non-nil (Response or Request) value means read trailer
r *bufio.Reader // underlying wire-format reader for the trailer
closing bool // is the connection to be closed after reading body?
doEarlyClose bool // whether Close should stop early
mu sync.Mutex // guards following, and calls to Read and Close
sawEOF bool
closed bool
earlyClose bool // Close called and we didn't read to the end of src
onHitEOF func() // if non-nil, func to call when EOF is Read
}
再看connReader的数据结构:
type connReader struct {
conn *conn
mu sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond *sync.Cond
inRead bool
aborted bool // set true before conn.rwc deadline is set to past
remain int64 // bytes remaining
}
先通过b.sawEOF判断是否读完了req.Body。读完req.Body之后,调用w.conn.r.startBackgroundRead函数,需要加锁。
cr.inRead标志位判断body是否被并发读,原则上不支持并发读。
cr.aborted终止读取,并设置连接截止时间为很久以前aLongTimeAgo,意图终止连接。
startBackgroundRead方法仅读取1个byte,忽略掉连接超时、网络错误、aborted==true的错误,报错则删除终止请求,即向res.closeNotifyCh写入true。
调用handler处理请求
释放ctx
调用cancelCtx的方法,释放ctx及其子节点的空间。
重用连接的处理
重用连接有两个条件:s.disableKeepAlive==0 并且s.inShutdown==0,通过在*Server服务端参数设置。
还可以在请求中配置重用连接,与上一个条件都达到连接才是可重用的。同时满足的条件为:
- 响应头"Connection: keep-alive"
- 如果有响应body,必须要写完。即请求方法非HEAD,且响应contentLength非-1,且写入body的类型符合的情况下,响应contentLength写入的字节数与resp.written相同
- 没有发生body.earlyClose错误,也主是未读完req.Body的情况
- 没有发生写入响应错误
所以综上所述,重用连接共有1,2,3共计6个条件,一个是通过客户端在Header中配置,其中两个是服务端的配置参数,其余三个条件是确保没有传输错误。
配置idleTimeout
将Server.idelTimeout()配置到conn.SetReadDeadline()。如果因超时未完成读取req.Body,或向连接写入response。其处理逻辑定位至上文“重用连接的处理”。
至此完成了一次http的请求处理流程。其中readRequest读包并解析 finishRequest刷缓存写包在后面继续更新。而serverHandler.ServeHTTP()是一上层mvc框架的复杂逻辑实现,单独写一个集合更新。
网友评论