美文网首页
初探go的协程池

初探go的协程池

作者: 住山洞的阿柔 | 来源:发表于2019-02-25 14:45 被阅读0次

    为什么需要协程池

    虽然go语言在调度Goroutine已经优化的非常完善,开启一个Goroutine的代价非常小。但是,如果无休止的开辟Goroutine依然会出现高频率的调度Goroutine,那么依然会浪费很多上下文切换的资源。所以设计一个Goroutine池限制Goroutine数量是非常有必要的。

    具体实现

    先定义Job和Worker作为协程池控制的最基本单元。之前正好在学习网络编程,就用协程池来做了个实验。就借此来看看协程池的具体实现。

    //协程池的最小工作单元,即具体业务处理结构体
    type Job struct {
        Connection net.Conn  //客户端的连接
    }
    
    //队列,用来接收、发送请求
    var JobQueue chan Job
    
    //用于执行job,可以理解为job的管理者
    type Worker struct {
        WorkerPool chan chan Job
        JobChannel chan Job
        quit chan bool
    }
    
    //初始化Worker
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker {
            WorkerPool:workerPool,
            JobChannel:make(chan Job),
            quit:make(chan bool),
        }
    }
    
    //运行Worker
    func (w Worker) Start() {
        go func() {
            for {
                //将可用的worker放进队列中
                w.WorkerPool  <- w.JobChannel
                select {
                case job := <- w.JobChannel:
                    //接收到具体请求时进行处理
                    HandleConnection(job.Connection)
                case <-w.quit:
                    //接收停止请求
                    return
                }
            }
        } ()
    }
    
    //发送停止请求
    func (w Worker) Stop() {
        go func() {
            w.quit <- true
        }()
    }
    

    接下来,定义分配worker的结构体dispatcher。

    type Dispatcher struct {
        WorkerPool chan chan Job    //worker的池子,控制worker的数量
        WorkerList []Worker         //worker的切片
    }
    
    //根据传入的值,创建对应数量的channel
    func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{
            WorkerPool:pool,
        }
    }
    
    //根据最大值,创建对应数量的worker
    func (d *Dispatcher) Run() {
        for i := 0; i < MaxWorkers; i++ {
            worker := NewWorker(d.WorkerPool)
            worker.Start()
            d.WorkerList = append(d.WorkerList, worker)
        }
        //监听工作队列
        go d.dispatch()
    }
    
    func (d *Dispatcher) dispatch() {
        for {
            select {
            case job := <-JobQueue:
                go func (job Job) {
                    jobChannel := <-d.WorkerPool
                    jobChannel <- job
                }(job)
            }
        }
    }
    
    //停止所有的worker
    func (d *Dispatcher) Stop() {
        for _, worker := range d.WorkerList {
            worker.Stop()
        }
    }
    

    以下是主函数的代码。

    func main() {
        l, e := net.Listen("tcp",":3207")
        if e != nil {
            fmt.Println(e)
            return
        }
    
        //创建dispatcher
        dispatcher := routinePool.NewDispatcher(routinePool.MaxWorkers)
        dispatcher.Run()
        //初始化工作队列
        routinePool.JobQueue = make(chan routinePool.Job, routinePool.MaxQueue)
    
        defer l.Close()
        defer dispatcher.Stop()
    
        for {
            //接受客户端的连接
            conn, err := l.Accept()
            if err != nil {
                return
            }
    
            job := routinePool.Job{
                Connection:conn,
            }
            //客户端连接放入工作队列
            routinePool.JobQueue <- job
        }
    }
    

    对于客户端请求的处理,我这里只做了最简单的打印处理。

    //解包
    func Unpack(buffer []byte, readerChannel chan []byte) []byte {
        length := len(buffer)
    
        var i int
        for i = 0; i < length; i++ {
            if length < i + DataLen {
                break
            }
            //根据长度来获取数据
            messageLen := BytesToInt(buffer[i:i+DataLen])
            if length < i + DataLen + messageLen {
                break
            }
            data := buffer[i+DataLen:i+DataLen+messageLen]
            readerChannel <- data
    
            i += DataLen + messageLen - 1
        }
    
        if i == length {
            return make([]byte, 0)
        }
        return buffer[i:]
    }
    
    //字节转换成整形
    func BytesToInt(b []byte) int {
        bytesBuffer := bytes.NewBuffer(b)
    
        var x int32
        binary.Read(bytesBuffer, binary.BigEndian, &x)
    
        return int(x)
    }
    
    //处理客户端请求
    func HandleConnection(conn net.Conn) {
        defer func() {
            fmt.Println(conn.RemoteAddr())
            conn.Close()
        }()
        tempBuffer := make([]byte, 0)
        readerChannel := make(chan []byte, 16)
        //fmt.Println(conn.RemoteAddr())
        go reader(readerChannel)
    
        buffer := make([]byte, 1024)
        for {
            n, err := conn.Read(buffer)
            if err != nil {
                return
            }
            tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
        }
    }
    
    func reader(readerChannel chan []byte) {
        for {
            select {
            case data := <- readerChannel:
                //fmt.Println(string(data))
                data = data
            }
        }
    }
    

    这是几个用到的常量。

    const MaxWorkers = 100000
    
    const MaxQueue = 3000
    
    const DataLen = 4
    

    文章和实现参考了用go一分钟处理百万请求

    相关文章

      网友评论

          本文标题:初探go的协程池

          本文链接:https://www.haomeiwen.com/subject/fdcgyqtx.html