美文网首页
fasthttp剖析

fasthttp剖析

作者: suoga | 来源:发表于2019-02-21 22:25 被阅读0次

先说点题外话,最近在开发公司级的网关,虽然没有明说,但是对于我们大家来说Nginx就是我们对标的对象。但是说实话,想要对标Nginx的性能,用Go开发基本上是不可能的,人家没有scheduler调度这一项就可以吊打Go了,更别说Go还有GC了。跑Benchmark的时候就能很明显地看到,随着并发请求的增多,Ngx的响应时间几乎就是一条完美的直线。而我们用Go开发的网关,在并发数小于等于阈值X的时候还能跟Ngx不相上下,虽然有周期性GC带来的毛刺,但是总体影响不大,毛刺主要影响的是99分位的响应时间。但是,一旦并发数超过阈值X,Go网关的响应时间便指数级地上升了。
这个现象,直接通过pprof观察可以发现,在并发数小于X时,总Goroutine数量基本保持稳定,但是一旦超过阈值X,goroutine数量则快速飙升上去。为什么呢?简单来说就是,由于Go的runtime需要调度goroutine(sleep哪个唤醒哪个抢占哪个),在Goroutine数量巨大的时候这个调度的开销非常大,每个goroutine被唤起的周期变得很长,因此会导致响应变长。同时,由于大量goroutine并没有完成其任务,导致无法回收,新到的请求就只能new goroutine,导致goroutine的数量进一步增加,使得响应时间进一步恶化…然后基本上服务就不可用了,pprof能够看到,绝大部分时间都在执行runtime.findRunableG。
这个现象让我想起了以前大学时代学习的二极管的雪崩击穿。虽然,Go号称goroutine非常轻量级,可以轻松地开到十万百万级,但是这话是省略了很多上下文和限制条件的。它只告诉你可以有millions of goroutine,但是没告诉你后果是啥,怎样才能开到millions,轻松开millions of goroutines是相对谁来说轻松…总之一句话就是,太美的承诺都不能信。

说回正题,fasthttp!为什么我进入主题之前说这么多题外话,本质上的目的就是想表明,对处理高并发场景的应用,goroutine的代价其实是不可忽视的,一定要省着用!fasthttp为什么比标准库net/http快,就是因为它并不是来一个请求就开一个goroutine,而是维护了一个workerPool,尽可能复用goroutine。当然还有很多别的优化,比如尽量减少数据copy,这些在fasthttp的API里就有很直观的体现。

先来简单看看fasthttp的大框架结构:

func (s *Server) Serve(ln net.Listener) error {
    var lastOverflowErrorTime time.Time
    var lastPerIPErrorTime time.Time
    var c net.Conn
    var err error
    // 略

    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
        connState:       s.setState,
    }
    wp.Start()

    atomic.AddInt32(&s.open, 1)
    defer atomic.AddInt32(&s.open, -1)

    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
      //略
        }
        s.setState(c, StateNew)
        atomic.AddInt32(&s.open, 1)
        if !wp.Serve(c) {
    // 略
        }
        c = nil
    }
}

上面是一个经过删减提炼后的代码,从上面可以看到,fasthttp在启动时先实例化并启动了一个workerPool,然后进入到了一个大循环中,也是每当accept一个连接之后就教给workerPool去Serve。
可以简单地对比以下标准库net/http对应逻辑的伪代码:

for {
    c,err := accept(s)
    go s.serve(c)
}

可以看到最大的区别就是fasthttp是wp.serve(c)而标准库是直接起一个goroutinego s.serve(c)。为什么不像标准库直接启动一个goroutine去处理呢?当然是为了优化啊!前面也说了,高并发下goroutine代价也是很高的,尽量复用goroutine。

接着我们来讲讲workerPool。和别的语言中线程池的实现思路基本一致,伪代码如下:


func init() {
    for i:=0 i<N; i++ {
      signalReceiver := make(chan net.Conn)
      ready = append(ready, signalReceiver)
      go job(signalReceiver)
    }
}

// 每个worker都有一个自己的channel,通过从channel中接收消息来获得执行权
func job(ch chan net.Conn) {
    for c := range ch {
        doSomething(c)
        //当完成任务后,把自己的channel放到ready队列里,表示自己是空闲状态
        lock()
        ready = append(ready, ch)
        unlock()
    }
}
var ready = []chan net.Conn{}

// 每次从ready队列里取一个空闲的channel,然后通知该job来执行任务
func serve(c net.Conn) {
  lock()
  readyJob := ready[len(ready)-1]
  ready = ready[:len(ready)-1]
  unlock()
  readyJob <- c
}

以上便是一个最简单的(问题多多)的workerPool。我们一开始启动了N个worker,每个worker都由一个自己的channel用于接收数据,然后一开始把所有worker的channel都放到ready队列里,表示所有的worker都处于空闲状态。每次接收到一个请求时,serve就通过ready去看哪个worker是空闲的,然后向那个worker的channel发消息,从而让该worker执行当前任务。

确保你完全明白上面的workerPool的实现思路,我们再继续看fasthttp的实现。
fasthttp的serve和我们上述基本一致:

func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}

从wp里取一个channel,然后向该channel发消息,让对应的worker执行任务。我们这个具体看看fasthttp是怎么找到空闲worker的:

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() {
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

其实也是,一开始尝试从ready队列里取,如果ready队列里没有,但是当前worker数量还没有达到用户配置的MaxWorkersCount,那么就新起一个worker,否则就直接返回nil。这里新建worker还用到了临时对象池sync.Pool也就是代码中的wp.workerChanPool,能在两次gc之间复用对象,减少内存分配的开销。不过从这里也能看出,fasthttp的workerPool是lazyLoading的,并不是像我们之前的实现那样一开始就创建N个worker。这么做当然就是省内存啦,大部分业务大时间服务器都不会有这么高的并发压力,因此fasthttp作为通用框架,lazyLoading肯定是一个正确的选择!

这里的wp.workerFunc其实就是我们之前伪代码中的job函数,在里面监听channel消息,然后执行业务逻辑。我们可以具体看看:

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch {
        if c == nil {
            break
        }

        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
        // 省略错误处理
        }
        if err == errHijacked {
            wp.connState(c, StateHijacked)
        } else {
            c.Close()
            wp.connState(c, StateClosed)
        }
        c = nil
        // 把ch放到ready队列里
        if !wp.release(ch) {
            break
        }
    }

    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

从上面代码我们能够看到,每个job确实也是不断地监听channel,如果收到消息且不是nil,那就执行真正的业务逻辑。成功执行完之后(省略掉一些错误处理分支),通过wp.release把channel放到ready队列里。正常情况下,workerFunc会一直执行,直到收到一个nil或者执行出错,然后把workerPool的workersCount-1并退出,之后就等着runtime来回收或者释放goroutine了。

以上就是fasthttp的主要逻辑,没有什么特别的设计,和其它线程池的设计几乎是一模一样的。当然fasthttp的workerPool还有些需要注意的性质,从上面可以看出,每次release到ready队列时,直接放到队尾,每次取也是从队尾取。因此fasthttp的worker队列是FILO的,即先进后出。这会导致在并发小的情况下很多先入队的worker会一直空闲。因此fasthttp也支持设置IdleDuration参数,定期清理空闲的worker减少资源占用。这部分代码:

  func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

    // Clean least recently used workers if they didn't serve connections
    // for more than maxIdleWorkerDuration.
    currentTime := time.Now()

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()

    // Notify obsolete workers to stop.
    // This notification must be outside the wp.lock, since ch.ch
    // may be blocking and may consume a lot of time if many workers
    // are located on non-local CPUs.
    tmp := *scratch
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

wp.Start中启动一个goroutine,定期执行wp.clean操作。wp.clean其实就是从头遍历ready队列,把空闲时间超过maxIdleWorkerDuration的都清理掉。这里清理也很简单,直接向该channel发送一个nil就行了。别忘了之前workFunc中,当收到一个nil之后就直接break出大循环,做些收尾工作然后退出函数,整个goroutine也就可以被runtime回收了。

不过这还不算完。我们再看看wp.WorkerFunc吧,这是一个很长的函数,其实就是s.serveConn(在初始化时把s.serveConn赋值给了wp.WorkerFunc),以下也是简化过的代码,我们只关注其hotpath:

func (s *Server) serveConn(c net.Conn) error {
  ctx := s.acquireCtx(c)
  var connRequestNum int
  for {
    connRequestNum++
    var br *bufio.Reader = acquireReader(ctx)
    err = ctx.Request.readLimitBody(br, maxRequestSize)
    err = s.Handler(ctx)
    var wr *bufio.Writer = acquireWriter(ctx)
    err = wr.Flush()
    if err != nil {
      return      
    }
    if s.MaxRequestNumPerConn {
      break;
    }
  }
// 省略
}

这里有个比较奇怪的地方,为什么要用一个无限循环呢?难道是接收网络包的分组之类的?NoNoNo,不要把概念搞混了!分组这些都是协议栈处理的内容,到Go这块直接就是应用层了。那为什么要无限循环呢?在我这篇文章里说过,只有通过三次握手新建的连接,才用Accept去取。建立好连接后,后续数据的收发都是基于该socket对象,也即net.Conn对象。

也就是说,只有新建的连接才会从Serve中的acceptConn函数开始,然后执行上述的逻辑。已经建立好的连接,后续的请求都在serveConn中循环处理。换句话说,如果一个HTTP请求是KeepAlive的(HTTP 1.1默认行为),那么worker就会一直处理此连接,无限循环地从该连接上读取数据(也就是下一个请求),然后进行业务逻辑。除非遇到connRequestNum >= MaxRequestNumPerConn或者其它错误了,才会关闭该连接,然后把自己设置为空闲。

这里还有个问题你可能会疑惑:由于并不知道下一次请求啥时候会发过来,这里只有一个ctx.Request.readLimitBody(br, maxRequestSize),并没有看到“不断尝试去读”这种逻辑呢?

这是一个好问题!!

其实这里的答案就是Go提供的一种强大的抽象net.Conn。它不仅仅是代表一个socket,同时它还被封装成了netPoller对象。netPoller是Go runtime的一个数据结构,也许你早已知道了linux的epoll,netPoller就是对epoll的一种封装。Go把socket注册到epoll里,后续当用户在net.Conn对象上调用Read时,实际上是这样的:

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if len(p) == 0 {
        // If the caller wanted a zero byte read, return immediately
        // without trying (but after acquiring the readLock).
        // Otherwise syscall.Read returns 0, nil which looks like
        // io.EOF.
        // TODO(bradfitz): make it wait for readability? (Issue 15735)
        return 0, nil
    }
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }

            // On MacOS we can see EINTR here if the user
            // pressed ^Z.  See issue #22838.
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

也就是先会进行一次syscall.Read,但是如果没有数据,此时会得到一个错误syscall.EAGAIN。这时,会执行fd.pd.waitRead,这个函数会一直阻塞直到epoll通知socket有数据就绪。这里的阻塞和syscall的阻塞调用不一样,这里的阻塞相当于主动让出时间片(park),当前线程可以去执行别的goroutine,然后等待适当的时间(epoll event fire)被runtime唤醒。从用户的角度来看,这就像是阻塞的。

那么这是怎么做到的呢?这就涉及到runtime的调度了,具体可以参见这篇文章。顺带提一句,查看太深入到runtime的代码一定要用dlv、lldb或者gdb,用IDE会跳到错误的位置,因为很多runtime的代码直接和平台有关了,不同平台对应实现也不一样,然后链接器也会搞一些事情导致符号表在源代码层面不能正确跳转,所以一定要用单步调试去看代码。

以上便是对fasthttp源码结构的一个剖析,接下来让我们思考一个问题吧:

假设有N个客户端都使用长连接(http keepalive)发送请求,同时假设每个客户端每秒发送M个请求。那么此时fasthttp和net/http的性能会如何呢?

由于有N个客户端,因此fashttp和net/http的Server都会Accept N次。标准库会启动N个goroutine,而对于fasthttp来说,由于每个连接都是长连接,每个worker会一直处理该连接直到连接关闭或者次数到了限制,因此ready队列一直是空的,所以也会启动N个goroutine。即使N很大,这种case下fasthttp和标准库所使用的goroutine数量的持平的。
但是由于fasthttp大量使用了sync.Pool复用对象减少内存分配的开销,而标准库每个请求都会new一个request和response对象。同时fasthttp中大部分存的是[]byte而标准库中多是string,因此fasthttp还相比标准库减少了很多内存复制的开销。
总体而言,fasthttp性能在各种场景下应该都比标准库好很多。

当然还有个tips,作为网关,外网和内网的一道门,为了防止恶意请求,还是应该在response后主动close连接,也就是说应该使用短连接,这样才安全。

That's all!

相关文章

  • fasthttp剖析

    先说点题外话,最近在开发公司级的网关,虽然没有明说,但是对于我们大家来说Nginx就是我们对标的对象。但是说实话,...

  • fasthttp

    http://blog.jobbole.com/105927/

  • fasthttp

    转自:https://segmentfault.com/a/1190000009133154 goroutine ...

  • net/http与fasthttp区别

    fasthttp 是 Go 的一款不同于标准库net/http的 HTTP 实现。fasthttp 的性能可以达到...

  • Go FastHttp优雅关闭实现方案

    使用Go开发web服务时很多情况下都会使用号称比标准库快10x的FastHttp, 但fasthttp(版本: 2...

  • fasthttp client example

    说明 官方文档关于fasthttp的demo基本都是http server, http client的介绍基本没有...

  • golang workerpool 源码阅读

    今天读了一下 fasthttp 的源码,其中读到了 workpool ,做了一些注释。

  • fasthttp 获取client ip

    用fasthttp 获取客户端ip 的方法是 ctx.RemoteIP().String(),正常情况下这个获取是...

  • fasthttp原理简析

    fasthttp是golang下的一个http框架,顾名思义,与原生的http实现相比,它的特点在于快,按照官网的...

  • 使用 fasthttp 时要注意的两个点

    我们做的是聚合支付系统,使用的是fasthttp 作为http server, http client 也是使用f...

网友评论

      本文标题:fasthttp剖析

      本文链接:https://www.haomeiwen.com/subject/qusdyqtx.html