美文网首页程序员
tunny源码阅读

tunny源码阅读

作者: lizhuoming | 来源:发表于2018-11-14 01:03 被阅读0次

    前言

    最近在学习Go并发,在同学强烈推荐下,阅读了tunny源码。在此记录自己的理解和感想。

    tunny 基于Go实现的协程池

    要去理解一个东西,最快的方式莫过于先去熟悉使用它。那么,现在我们就开始使用它:假设我们现在的需求是输入一个字符串,将它与"Hello"拼接后打印并返回,要求用tunny实现。

        #在NewFunc函数中需要对传入的函数进行转换并执行
        printHello := func(str interface{}) interface{} {
            fmt.Println("Hello!", str)
            return "Hello! " + str.(string)
        }
        pool3 := tunny.NewFunc(3, func(payload interface{}) interface{} {
            f, ok := payload.(func())
            if !ok {
                return nil
            }
            f()
            return f
        })
        pool3.Process(printHello("lizhuoming"))
    
        #而NewCallback的Process函数封装了这个操作
        printHello := func(str interface{}) interface{} {
            fmt.Println("Hello!", str)
            return "Hello! " + str.(string)
        }
        pool2 := tunny.NewCallback(2)
        pool2.Process(printHello("lizhuoming"))
    
    #New的灵活度最高,我们可以定制自己的Worker
    type myWorker struct {
        processor func(interface{}) interface{}
    }
    
    func (w *myWorker) Process(payload interface{}) interface{} {
        return w.processor(payload)
    }
    
    func (w *myWorker) BlockUntilReady() {}
    func (w *myWorker) Interrupt()       {}
    func (w *myWorker) Terminate()       {}
    
    func main() {
        printHello := func(str interface{}) interface{} {
            fmt.Println("Hello!", str)
            return "Hello! " + str.(string)
        }
    
        pool1 := tunny.New(3, func() tunny.Worker {
            return &myWorker{
                processor: printHello,
            }
        })
        pool1.Process("lizhuoming")
    }
    

    源码分析

    在熟悉了tunny的使用后,我们通过代码来看看它是如何工作的吧~

    协程池的主要工作流程

    在我们创建并指定协程池容量后,协程池会启动指定容量个协程。它们竞争向一个channel中写入 workRequest(它充当一个桥梁,连接 Process 函数与真正执行任务的协程)。当你调用 Process 函数时,它会通过这个桥梁将任务传递给协程,并在任务结束后,接收到协程返回的结果。
    下面,我们来了解它的具体实现吧

    桥梁以及Process函数与协程之间的通信实现

    type Worker interface {
        // 执行任务
        Process(interface{}) interface{}
        // 在执行任务前执行,相当于init
        BlockUntilReady()
        // 在任务执行时被终止时,会执行该函数
        Interrupt()
        // 当协程被关闭时,执行该函数
        Terminate()
    }
    
    //协程池
    type Pool struct {
            //正在执行的任务数量
            queuedJobs int64
            ctor    func() Worker
            workers []*workerWrapper
            //所有运行的协程会竞争向该channel写入workRequest
            reqChan chan workRequest
            workerMut sync.Mutex
    }
    
    //桥梁载体
    type workRequest struct {
            //接收任务的channel
            jobChan chan<- interface{}
            //返回结果的channel
            retChan <-chan interface{}
            //中断协程的执行
            interruptFunc func()
    }
    
    //负责管理worker(stop函数)和goroutine(interrupt函数)的整个生命周期
    type workerWrapper struct {
            worker        Worker
            interruptChan chan struct{}
            // workerWrapper 和 Pool 的reqChan是同一个(channel是引用传递)
            reqChan chan<- workRequest
            closeChan chan struct{}
            closedChan chan struct{}
    }
    
    func (p *Pool) ProcessTimed(
        payload interface{},
        timeout time.Duration,
    ) (interface{}, error) {
        atomic.AddInt64(&p.queuedJobs, 1)
        defer atomic.AddInt64(&p.queuedJobs, -1)
    
        tout := time.NewTimer(timeout)
    
        var request workRequest
        var open bool
    
        select {
        //读取桥梁载体
        case request, open = <-p.reqChan:
            if !open {
                return nil, ErrPoolNotRunning
            }
        //超时处理
        case <-tout.C:
            return nil, ErrJobTimedOut
        }
    
        select {
        //通过桥梁载体将任务传给协程
        case request.jobChan <- payload:
        case <-tout.C:
            //调用 workerWrapper 的 interrupt 方法,结束函数执行
            request.interruptFunc()
            return nil, ErrJobTimedOut
        }
    
        select {
        //接收返回数据
        case payload, open = <-request.retChan:
            if !open {
                return nil, ErrWorkerClosed
            }
        case <-tout.C:
            //调用 workerWrapper 的 interrupt 方法,结束函数执行
            request.interruptFunc()
            return nil, ErrJobTimedOut
        }
    
        tout.Stop()
        return payload, nil
    }
    
    func (w *workerWrapper) run() {
        jobChan, retChan := make(chan interface{}), make(chan interface{})
        defer func() {
            w.worker.Terminate()
            close(retChan)
            close(w.closedChan)
        }()
    
        for {
            w.worker.BlockUntilReady()
            select {
            // 给channel中写入桥梁载体,为协程私有
            case w.reqChan <- workRequest{
                jobChan:       jobChan,
                retChan:       retChan,
                interruptFunc: w.interrupt,
            }:
                select {
                //尝试读取任务
                case payload := <-jobChan:
                    result := w.worker.Process(payload)
                    select {
                    case retChan <- result:
                    case <-w.interruptChan:
                        w.interruptChan = make(chan struct{})
                    }
                //执行被中断,新建中断 channel
                case _, _ = <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
            // 协程被关闭
            case <-w.closeChan:
                return
            }
        }
    }
    

    协程池如何保证协程数恒定

    func (p *Pool) SetSize(n int) {
        p.workerMut.Lock()
        defer p.workerMut.Unlock()
    
        lWorkers := len(p.workers)
        if lWorkers == n {
            return
        }
        // 给池中添加协程
        for i := lWorkers; i < n; i++ {
            p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
        }
        // 异步关闭超出的协程
        for i := n; i < lWorkers; i++ {
            p.workers[i].stop()
        }
        // 同步等待所有超出协程都关闭完成
        for i := n; i < lWorkers; i++ {
            p.workers[i].join()
        }
        p.workers = p.workers[:n]
    }
    

    一些感想

    代码就说到这儿了,下面来谈谈我的感想:
    (1)不得不说人家的代码健壮性真好,以后自己在写代码时也要借鉴人家的经验
    (2)通过对 workerWrapper 和 workRequest 的设计和逻辑的拆分,使代码解耦,并且这样的代码逻辑看起来是非常清晰的

    相关文章

      网友评论

        本文标题:tunny源码阅读

        本文链接:https://www.haomeiwen.com/subject/aceffqtx.html