美文网首页应用微服务golang
Go:每分钟处理百万请求

Go:每分钟处理百万请求

作者: Go语言由浅入深 | 来源:发表于2021-04-11 17:43 被阅读0次

    【译文】原文地址

    问题

    从事匿名遥测和分析系统,我们的目标是能够处理来自大量客户端的POST请求。我们的web服务将接收JSON文档内容包括很多的负载需要发送到亚马逊S3存储,为了后续使用map-reduce来处理这些数据。

    传统方式我们将创建worker-tier架构,使用包含如下中间件:

    • SideKiq
    • Resque
    • DelayedJob
    • Elasticbeanstalk worker tier
    • RabbitMQ
      搭建两个集群,一个部署前端另一个用于workers,这样就可以通过扩展来应对大量后端任务。自开始,团队就知道应该使用Go,因为在讨论的过程中就看出来可能是个很大流量的系统。作者使用go大概两年,也开发了一些系统但是没有处理过这种大流量的。

    起初我们创建一些struct来定义web服务POST请求接收的负载和上传数据到S3桶的方法。

    type PayloadCollection struct {
        WindowsVersion  string    `json:"version"`
        Token           string    `json:"token"`
        Payloads        []Payload `json:"data"`
    }
    
    type Payload struct {
        // 待实现
    }
    
    func (p *Payload) UploadToS3() error {
        // the storageFolder method ensures that there are no name collision in
        // case we get same timestamp in the key name
        storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
    
        bucket := S3Bucket
    
        b := new(bytes.Buffer)
        encodeErr := json.NewEncoder(b).Encode(payload)
        if encodeErr != nil {
            return encodeErr
        }
    
        // Everything we post to the S3 bucket should be marked 'private'
        var acl = s3.Private
        var contentType = "application/octet-stream"
    
        return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
    }
    

    简单的Go协程方法

    开始我们使用一个很简单的POST handler来实现,仅将任务放进一个简单goroutine中来并行处理。

    func payloadHandler(w http.ResponseWriter, r *http.Request) {
    
        if r.Method != "POST" {
            w.WriteHeader(http.StatusMethodNotAllowed)
            return
        }
    
        // 将body读取到字符串并使用json解码
        var content = &PayloadCollection{}
        err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
        if err != nil {
            w.Header().Set("Content-Type", "application/json; charset=UTF-8")
            w.WriteHeader(http.StatusBadRequest)
            return
        }
    
        //迭代每个payload并逐个上传到S3
        for _, payload := range content.Payloads {
            go payload.UploadToS3()   // <----- DON'T DO THIS
        }
    
        w.WriteHeader(http.StatusOK)
    }
    

    对于流量不是很大的情况,可以应对大多数人请求,但是在大规模情况下很快上面的方法就被证明不是很好了。我们预期有很多的请求,但和我们部署第一个版本到生产环境中所看到的不一样。我们完全低估了流量。以上方法有很多不足地方。无法控制goroutine的数量。当达到每分钟1百万POST请求的时候,代码直接瘫痪了。

    再次优化

    需要另找方法。一开始我们就讨论如何保持处理请求生命周期非常短,并在后台处理。当然这个在Ruby中是必须做的,否则将阻塞所有可用的worker。然后我们就使用常规解决方案来做,比如Resque、Sidekiq、SQS等。很多处理这种问题的方法。

    因此第二个版本通过创建带缓冲的channel,这样就可以缓存一些jobs,并逐步上传到S3,而且可以控制缓存队列的长度,有足够内存也能够存放这些job。我们认为将job存放到channel队列中是可以的。

    var Queue chan Payload
    
    func init() {
        Queue = make(chan Payload, MAX_QUEUE)
    }
    
    func payloadHandler(w http.ResponseWriter, r *http.Request) {
        ...
        // Go through each payload and queue items individually to be posted to S3
        for _, payload := range content.Payloads {
            Queue <- payload
        }
        ...
    }
    

    然后消费队列处理jobs,使用类似如下方式:

    func StartProcessor() {
        for {
            select {
            case job := <-Queue:
                job.payload.UploadToS3()  // <-- STILL NOT GOOD
            }
        }
    }
    

    老实说,我对我们所想的一点底都没有。这个方法并没有让我们轻松,我们用缓存来应对并发只是延缓了问题的爆发。同步处理机制,每次上传一个负载到S3,因为接受请求的速度太快,远比一个处理器上传到S3点速度快,导致缓存很快就挤满了,导致后面来的请求直接阻塞。我们只是在回避这个问题,直到倒计时我们的系统最终死亡。在我们部署了这个有缺陷的版本之后,我们的延迟率在几分钟内以恒定的速率不断增加。


    更好的方法

    我们决定在使用go channel时利用一个通用的模式,创建一个两层的channel系统,一个用来存放jobs另一个用于控制并发处理job队列的workers的数量。为了保持一定层度的并发上传数据到S3,一方面不会使系统拖垮,另一方面不会出现连接S3错误。因此我们选择创建一个job/worker模式。这在java、C#等中经常使用。考虑以golang方式实现可以通过使用channels来代替worker线程池。

    var (
        MaxWorker = os.Getenv("MAX_WORKERS")
        MaxQueue  = os.Getenv("MAX_QUEUE")
    )
    
    // Job represents the job to be run
    type Job struct {
        Payload Payload
    }
    
    // A buffered channel that we can send work requests on.
    var JobQueue chan Job
    
    // Worker represents the worker that executes the job
    type Worker struct {
        WorkerPool  chan chan Job
        JobChannel  chan Job
        quit        chan bool
    }
    
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker{
            WorkerPool: workerPool,
            JobChannel: make(chan Job),
            quit:       make(chan bool)}
    }
    
    //Start方法通过循环监听任务请求和停止信号。
    // case we need to stop it
    func (w Worker) Start() {
        go func() {
            for {
                            //注册当前worker到worker队列
                w.WorkerPool <- w.JobChannel
    
                select {
                case job := <-w.JobChannel:
                    // 接收到一个工作请求
                    if err := job.Payload.UploadToS3(); err != nil {
                        log.Errorf("Error uploading to S3: %s", err.Error())
                    }
    
                case <-w.quit:
                    // 接收到停止工作信号
                    return
                }
            }
        }()
    }
    
    //Stop方法通知worker停止监听工作请求
    func (w Worker) Stop() {
        go func() {
            w.quit <- true
        }()
    }
    

    修改web请求处理函数,创建一个Job结构体实例并将Job实例发送到JobQenue channel供worker处理。

    func payloadHandler(w http.ResponseWriter, r *http.Request) {
    
        if r.Method != "POST" {
            w.WriteHeader(http.StatusMethodNotAllowed)
            return
        }
    
        //将body读取到字符串并使用json解码
        var content = &PayloadCollection{}
        err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
        if err != nil {
            w.Header().Set("Content-Type", "application/json; charset=UTF-8")
            w.WriteHeader(http.StatusBadRequest)
            return
        }
    
        // 迭代每个payload并逐个上传到S3
        for _, payload := range content.Payloads {
    
           //创建Job实例
            work := Job{Payload: payload}
    
           //将worker发送到队列
            JobQueue <- work
        }
    
        w.WriteHeader(http.StatusOK)
    }
    

    在web服务器初始化的时候创建一个Dispather和调用Run()来创建workers池,并开始监听JobQueue中jobs。

    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    

    以下是dispatcher实现:

    type Dispatcher struct {
        WorkerPool chan chan Job
    }
    
    func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{WorkerPool: pool}
    }
    
    func (d *Dispatcher) Run() {
        // starting n number of workers
        for i := 0; i < d.maxWorkers; i++ {
            worker := NewWorker(d.WorkerPool)
            worker.Start()
        }
    
        go d.dispatch()
    }
    
    func (d *Dispatcher) dispatch() {
        for {
            select {
            case job := <-JobQueue:
                // a job request has been received
                go func(job Job) {
                    // try to obtain a worker job channel that is available.
                    // this will block until a worker is idle
                    jobChannel := <-d.WorkerPool
    
                    // dispatch the job to the worker job channel
                    jobChannel <- job
                }(job)
            }
        }
    }
    

    注意我们设置了workers的实例最大数量,并添加到worker池中。因为我们项目在docker环境中使用了亚马逊Elasticbeanstalk,所以在生产环境下我们总是遵循12-factor原则来配置我们系统,通过环境变量的方式读取配置。这种方式我们可以控制workers的数量和JobQueue的长度,因此可以快速调整这些值不需要重新部署集群。

    var (
        MaxWorker = os.Getenv("MAX_WORKERS")
        MaxQueue  = os.Getenv("MAX_QUEUE")
    )
    

    中间结果

    在部署上面的优化方案之后,很快我们就发现延时下降到可接受范围之内,并且可以处理波动很大的请求量。



    在几分钟后当弹性负载均衡起作用后,看到ElasticBeanstalk应用服务处理近1百度请求每分钟。经常在早上几小时流量还能飚升到超过一百万每分钟。

    新代码部署后,服务器的数量很快从100多个降20个。


    总结

    在这里使用了简单的方法。本来我们可以设计复杂的系统包含很多的队列、后台workers、复杂的部署,但是我们决定使用Elasticbeanstalk自动扩展能力和go提供的高效简单的并发方法。

    相关文章

      网友评论

        本文标题:Go:每分钟处理百万请求

        本文链接:https://www.haomeiwen.com/subject/rmpakltx.html