美文网首页
tunny源代码阅读

tunny源代码阅读

作者: 灰化肥发黑会挥发 | 来源:发表于2020-12-02 21:15 被阅读0次

    线程池——tunny

    代码:https://github.com/Jeffail/tunny

    代码目录
    tunny-master
    ├── LICENSE
    ├── README.md 
    ├── go.mod
    ├── tunny.go
    ├── tunny_logo.png
    ├── tunny_test.go
    └── worker.go
    

    主要的代码文件有两个,tunny.go 线程池相关,worker.go 消费者相关逻辑,先看tunny.go的类对象

    type Pool struct {
        queuedJobs int64  // 当前池子中的任务数量
    
        ctor    func() Worker // 用户具体要执行的方法,处理输入参数,获取结果
        workers []*workerWrapper // 维护有目前有多少线程运行,即线程池大小的worker集合
        reqChan chan workRequest // 维护有多少空闲的线程,只要有空闲的worker就加入到这个channel中
    
        workerMut sync.Mutex
    }
    

    再看worker.go的方法,主要有两个对象

    type workRequest struct {
        // jobChan is used to send the payload to this worker.
        jobChan chan<- interface{}  // 用于放worker的payloda参数
    
        // retChan is used to read the result from this worker.
        retChan <-chan interface{} // 用于放worker的处理结果
    
        // interruptFunc can be called to cancel a running job. When called it is no
        // longer necessary to read from retChan.
        interruptFunc func()// 用于取消worker
    }
    
    type workerWrapper struct {
        worker        Worker // worker类对象,有客户自己实现的process方法
        interruptChan chan struct{} 
    
        // reqChan is NOT owned by this type, it is used to send requests for work.
        reqChan chan<- workRequest // 和Pool的reqChan是一个,当前worker如果空闲,则加入到reqChan中,通知pool
    
        // closeChan can be closed in order to cleanly shutdown this worker.
        closeChan chan struct{} // 用于通知worker要被关闭掉
    
        // closedChan is closed by the run() goroutine when it exits.
        closedChan chan struct{} // 用于控制worker是否已经被关闭
    }
    

    再看worker.go的其它方法:

    func newWorkerWrapper(reqChan chan<- workRequest, worker Worker,) *workerWrapper {
        w := workerWrapper{
            worker:        worker,
            interruptChan: make(chan struct{}),
            reqChan:       reqChan,
            closeChan:     make(chan struct{}),
            closedChan:    make(chan struct{}),
        }
    
        go w.run()
    
        return &w
    }
    

    上面是创建workerWrapper方法,输入参数

    1)reqChan:和pool的reqChan是同一个,用于控制当前的workerWrapper是否是空闲的,

    2)worker是具体需要执行的方法对应的接口,worker的接口方法process被用户实现;

    其中该方法会执行run方法,这个方法比较细节,具体代码:

    func (w *workerWrapper) run() {
        jobChan, retChan := make(chan interface{}), make(chan interface{})
        defer func() {
            w.worker.Terminate()
            close(retChan)
            close(w.closedChan)
        }()
    
        for {
            // NOTE: Blocking here will prevent the worker from closing down.
            w.worker.BlockUntilReady()
            select {
            case w.reqChan <- workRequest{
                jobChan:       jobChan,
                retChan:       retChan,
                interruptFunc: w.interrupt,
            }:
                select {
                case payload := <-jobChan:
                    result := w.worker.Process(payload)
                    select {
                    case retChan <- result:
                    case <-w.interruptChan:
                        w.interruptChan = make(chan struct{})
                    }
                case _, _ = <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
            case <-w.closeChan:
                return
            }
        }
    }
    

    首先创建两个jobChan和retChan,jobChan用于存储需要执行的参数, retChan用于存储函数执行参数后的结果; 然后有一个defer函数,将当前worker关闭,如果pool stop 这个worker,则会退出。然后有一个for会一直执行里面的逻辑,

            case w.reqChan <- workRequest{
                jobChan:       jobChan,
                retChan:       retChan,
                interruptFunc: w.interrupt,
            }:
           select {
                case payload := <-jobChan:
                    result := w.worker.Process(payload)
                    select {
                    case retChan <- result:
                    case <-w.interruptChan:
                        w.interruptChan = make(chan struct{})
                    }
                case _, _ = <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
    

    然后会往reqChan中插入一个workRequest,跳到这个case里面的逻辑:

     select {
                case payload := <-jobChan:
              result := w.worker.Process(payload)
              select {
              case retChan <- result:
              case <-w.interruptChan:
              w.interruptChan = make(chan struct{})
              }
                case _, _ = <-w.interruptChan:
                        w.interruptChan = make(chan struct{})
                }
    

    这块会阻塞等待往jobChan的里面塞一个东西(这块是pool里面的逻辑),然后payload := <- jobChan取到这个值,继续走里面的逻辑:

     result := w.worker.Process(payload)
              select {
              case retChan <- result:
    

    使用worker处理参数,并把结果放入到retChan中, pool那边会阻塞等待取到结果,返回给用户。

    这个run方法其实和pool有很大的关系, 可以接着看pool的方法,

    func (p *Pool) Process(payload interface{}) interface{} {
        atomic.AddInt64(&p.queuedJobs, 1)
      /* 从chan中取出空闲的workerRequest 这块对应 worker run中的 
            case w.reqChan <- workRequest{
                jobChan:       jobChan,
                retChan:       retChan,
                interruptFunc: w.interrupt,
            }
            */
        request, open := <-p.reqChan 
        if !open {
            panic(ErrPoolNotRunning)
        }
        
      /* 将用户传输的payload传递给request的jobChan, 这块对应 worker run中的 
                case payload := <-jobChan:
                    result := w.worker.Process(payload)
      */
        request.jobChan <- payload
    
        /* 获取request中的结果,这块对应 worker run中的 
                select {
                    case retChan <- result:
      */
        payload, open = <-request.retChan
        if !open {
            panic(ErrWorkerClosed)
        }
    
        atomic.AddInt64(&p.queuedJobs, -1)
        return payload
    }
    

    上面的这段逻辑基本上是线程池之间数据流转的核心, 除此之外还有些其它功能,例如设置 pool的大小,需要注意,如果当前的进程数量小于设置的大小,需要增加worker, 否则需要减少, 这块代码如下

    func (p *Pool) SetSize(n int) {
        p.workerMut.Lock() // 加锁
        defer p.workerMut.Unlock()
    
        lWorkers := len(p.workers) // 计算当前的worker数量
        if lWorkers == n {
            return
        }
    
        // Add extra workers if N > len(workers)
        for i := lWorkers; i < n; i++ {
            p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
        } 
    
      // 下面两个for循环都是为了停掉多余的worker,这块调用了stop和join,下面会继续分析,这块是我觉得可以学习的一点
        // Asynchronously stop all workers > N
        for i := n; i < lWorkers; i++ {
            p.workers[i].stop()
        }
    
        // Synchronously wait for all workers > N to stop
        for i := n; i < lWorkers; i++ {
            p.workers[i].join()
        }
    
        // Remove stopped workers from slice
        p.workers = p.workers[:n]
    }
    

    下面分析stop和join方法, 需要结合 run方法一起看

    /*
    调用stop,会关闭掉closeChan
    这时候run函数的对应逻辑为:
            case <-w.closeChan:
                return
    在返回之前 run有一个defer方法
        defer func() {
            w.worker.Terminate()
            close(retChan)
            close(w.closedChan)
        }()
    */
    func (w *workerWrapper) stop() {
        close(w.closeChan)
    }
    
    /*
    这块是上面注释中解释的defer执行结束前, 会一直阻塞。
    */
    func (w *workerWrapper) join() {
        <-w.closedChan
    }
    
    
    func (w *workerWrapper) run() {
        jobChan, retChan := make(chan interface{}), make(chan interface{})
        defer func() {
            w.worker.Terminate()
            close(retChan)
            close(w.closedChan)
        }()
    
        for {
            // NOTE: Blocking here will prevent the worker from closing down.
            w.worker.BlockUntilReady()
            select {
            case w.reqChan <- workRequest{
                jobChan:       jobChan,
                retChan:       retChan,
                interruptFunc: w.interrupt,
            }:
                select {
                case payload := <-jobChan:
                    result := w.worker.Process(payload)
                    select {
                    case retChan <- result:
                    case <-w.interruptChan:
                        w.interruptChan = make(chan struct{})
                    }
                case _, _ = <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
            case <-w.closeChan:
                return
            }
        }
    }
    

    除此以外,源代码中还使用了接口来定义worker, 并实现了closureWorker

    type Worker interface {
        // Process will synchronously perform a job and return the result.
        Process(interface{}) interface{}
    
        // BlockUntilReady is called before each job is processed and must block the
        // calling goroutine until the Worker is ready to process the next job.
        BlockUntilReady()
    
        // Interrupt is called when a job is cancelled. The worker is responsible
        // for unblocking the Process implementation.
        Interrupt()
    
        // Terminate is called when a Worker is removed from the processing pool
        // and is responsible for cleaning up any held resources.
        Terminate()
    }
    
    //------------------------------------------------------------------------------
    
    // closureWorker is a minimal Worker implementation that simply wraps a
    // func(interface{}) interface{}
    type closureWorker struct {
        processor func(interface{}) interface{} // processor 在申明结构体的时候会赋值给用户具体实现的函数
    }
    
    func (w *closureWorker) Process(payload interface{}) interface{} {
        return w.processor(payload) // 非直接处理, 而是用这种委托的方式感觉比较好
    }
    
    func (w *closureWorker) BlockUntilReady() {}
    func (w *closureWorker) Interrupt()       {}
    func (w *closureWorker) Terminate()       {}
    
    //---------------
    

    值得学习的点:

    1. 线程池和worker之间的交互使用同一个reqChan, 并且两边分别都阻塞等待下一步执行完。
    2. worker的关闭使用两个chan来控制
    3. 将ctor交给每个worker, 使用interface
    4. 面向接口编程,申明worker

    欢迎一起探讨:微信联系方式 13161411563

    参考资料

    [1] https://www.shangmayuan.com/a/839f766980484fd88facf32c.html

    相关文章

      网友评论

          本文标题:tunny源代码阅读

          本文链接:https://www.haomeiwen.com/subject/olauwktx.html