美文网首页
Go 语言的 Channel - 源码分析

Go 语言的 Channel - 源码分析

作者: 达菲格 | 来源:发表于2018-12-15 15:32 被阅读0次

    这部分看的是 golang1.2 的源码(其实是 3 年前看的,最近又拿出来复习整理了下),C语言实现的。

    如果只是为了学习,而不是为了实际参与项目的开发,还是建议阅读低版本源代码,因为更纯粹些。

    越是新的版本,里面掺杂的周边逻辑越多,比如 race, debug, profile 等等,这些周边功能就像打日志一样,到处都是,对阅读源码有较强的干扰。

    基本原理

    Channel 的内部主要结构挺简单的,本质上就是一个循环队列。如下图:

    Channel 内部结构

    使用一个循环队列来实现channel buffer,如果是非buffer channel,那么这个队列长度为0.

    dataqsiz表示队列总长度,即cap(ch), dataq表示 buffer 中元素的个数,即len(ch)

    sendxrecvx分别表示队列的游标,表示队列中元素的头和尾。

    channel 还拥有两个 goroutine 链表,sendqrecvq;链表中存放的是因该 channel 阻塞住的 goroutine。
    一个 goroutine 操作这个channel(发送或者读取),如果发生阻塞,都会放到相应的链表中;如果没发生阻塞,会尝试从相应的链表中唤醒一个goroutine。

    ch <- 1为例子,流程大致如下:

    1. 检查channel是否能正常写入(即队列没有满),如果能则运行第2步, 否则执行第3步.
    2. 将数据写入buffer, 唤醒 recvq 中的一个 goroutine,并把 recvx 处的数据 copy 给刚唤醒的 goroutine.
    3. 把自己加到 sendq 中,然后 block. 该 goroutine 会等到后续的 <-ch 操作被叫醒.

    var ch chan int

    注意看下面代码的注释。

    // 对goroutine的一个封装,专门给channel使用
    struct  SudoG 
    {
            G*      g;
            SudoG*  link;
            byte*   elem;
            ...
    };
    struct  WaitQ   // goroutine队列, 链表结构
    {
            SudoG*  first;
            SudoG*  last;
    };
    
    struct  Hchan // 我们的channel,各个成员都在注释中说明
    {
            uintgo  qcount;    // len(ch)
            uintgo  dataqsiz;  // cap(ch)
            uint16  elemsize;  // 单个元素大小,因为我们声明的是 chan int 类型,这里就是 sizeof(int)
            bool    closed;    // 是否关闭, 执行 close(ch) 后就关闭了
            uintgo  sendx;     // send index for buffer
            uintgo  recvx;     // receive index for buffer
            WaitQ   recvq;     // 接收的goroutine队列,单链表
            WaitQ   sendq;     // 发送的goroutine队列,单链表
            Lock;
    };
    

    make(chan int, 10)

    runtime·makechan_c(ChanType *t, int64 hint) {
        // ....
            c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0);
            c->elemsize = elem->size;
            c->dataqsiz = hint;
            // ...
            return c
    }
    

    我们可以看到,make一个channel,就是开辟了一块连续的内存空间,内存的大小就是Hchan结构本身 加上 channel buffer大小: sizeof(c) + hint*elem->size

    ch <- 1

    写入操作,这个函数很长,下面分成了几个分支来说明

    当 ch 是 nil 时

    channel是一个nil值, 会使 goroutine 阻塞住

    if(c == nil) {
        //...
        runtime·park(nil, nil, "chan send (nil chan)");
        return;  // not reached
    }
    

    如果 ch 已经被 close 了

    channel 已经关闭,panic

            runtime·lock(c);
            if(c->closed)
                    goto closed; // unlock an panic
            // ....
    closed:
            runtime·unlock(c);
            runtime·panicstring("send on closed channel");
    

    如果 ch 是 buffer 的

    处理带 buffer 的channel,仔细看注释

    runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){
            if(c->dataqsiz > 0) // 带buffer 的 channel
                    goto asynch; // 异步
            // ...
    asynch:
            if(c->closed)
                    goto closed;
            if(c->qcount >= c->dataqsiz) { // buffer 满了
                    enqueue(&c->sendq, &mysg); // 放到sendq中
                    runtime·park(runtime·unlock, c, "chan send");// 阻塞
                    goto asynch; // 回去重新检查
            }
    
           // 把数据copy到buffer中
            c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
            if(++c->sendx == c->dataqsiz) // 修改队列游标
                    c->sendx = 0;
            c->qcount++;
    
            sg = dequeue(&c->recvq); // 把 recvq 里的找到一个goroutine
            if(sg != nil) {
                    gp = sg->g;
                    runtime·ready(gp); // 唤醒
            }
    }
    

    ch 不是 buffer 的

    非 buffer channel 的处理方式

    runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){
            sg = dequeue(&c->recvq); // 从 recvq 获取一个等待该channel的goroutine
            if(sg != nil) {
                    c->elemalg->copy(c->elemsize, sg->elem, ep); // 把值 copy 给这个 goroutine
                    runtime·ready(gp); // 唤醒
                    return;
            }
            enqueue(&c->sendq, &mysg); // 把自己放到发送队列 sendq 中
            runtime·park(runtime·unlock, c, "chan send"); // 阻塞自己
    }
    

    v, ok := <-ch

    和上面的过程几乎是一模一样,不重复了。唯独有点不同的就是对关闭 channel 的处理。从一个已关闭的 channel 中读取数据,是不会 panic 的,而是直接返回。

    closed:
            if(ep != nil)
                    c->elemalg->copy(c->elemsize, ep, nil);
            if(selected != nil) // 如果在select语句里
                    *selected = true;
            if(received != nil) // ok 值
                    *received = false;
            runtime·unlock(c);
    

    select

    结构

    truct   Scase
    {
        SudoG   sg;         // must be first member (cast to Scase)
        Hchan*  chan;           // chan
        byte*   pc;         // return pc
        uint16  kind; // 类型, recv或send或default
        bool*   receivedp;      // pointer to received bool (recv2)
    };
    
    struct  Select
    {
        uint16  tcase;          // total count of scase[]
        uint16  ncase;          // currently filled scase[]
        uint16* pollorder;      // case poll order
        Hchan** lockorder;      // channel lock order
        Scase   scase[1];       // one per case (in order of appearance)
    }
    

    pollorder是用来遍历lockorder的,为了实现乱序,把pollorder设置成乱序的数组,然后用其值作为索引遍历lockeorder。支持乱序的原因是,避免如果第一个 channel 总是有数据,那其他的 channel case 就永远没机会执行了。

    乱序

    select 中所有 case 如果都满足达到非阻塞条件,哪个会被执行是随机的。这个随机是在程序里故意实现的。下面是一个乱序算法。

    for(i=0; i<sel->ncase; i++)
            sel->pollorder[i] = i;
    for(i=1; i<sel->ncase; i++) {
            o = sel->pollorder[i];
            j = runtime·fastrand1()%(i+1);
            sel->pollorder[i] = sel->pollorder[j];
            sel->pollorder[j] = o;
    }
    

    三次循环

    Select的源代码函数里,有三个主要的循环。太长了这里不贴代码了。

    1. 循环检查(乱序遍历)所有 case 看是否有满足的channel,有就直接执行,然后return,否则执行第2步。
    2. 把 goroutine(也就是自己)加到所有 case 的 channel 的发送或接收队列中,然后阻塞,等待被叫醒。
    3. 被其中一个 case 的 channel 唤醒,把自己从其他所有 case 的 channel 的队列中删除,设置 PC 值,即被唤醒后进入哪个 case。

    看着效率很差,但我们一般在代码中也写不了多少个 case,一般都是三五个,也不会导致性能下降。

    close(ch)

    关闭channel, 很简单。如果是 nil 或者已经被 close 了,直接 panic。

    关闭后,会唤醒 recvqsendq 两个链表中的所有 goroutine。

    if(c == nil)// nil channel
            runtime·panicstring("close of nil channel");
    if(c->closed) { // closed channel
            runtime·panicstring("close of closed channel");
    }
    
    c->closed = true;
    
    // release all readers
    for(;;) {
            sg = dequeue(&c->recvq);
            if(sg == nil)
                    break;
            runtime·ready(gp);
    }
    
    // release all writers
    for(;;) {
            sg = dequeue(&c->sendq);
            if(sg == nil)
                    break;
            runtime·ready(gp);
    }
    

    sendq 里的 goroutine 一旦被唤醒,就会 panic,因为它在尝试向一个关闭 channel 发数据。所以在参数传递时,会把 channel 做类型转换,声明下它是消费者还是生产者,即 <-chan Typechan<- Type 2 种类型。避免消费者随意关闭 channel 导致生产者 panic。

    nil channel 的用途

    通常用来暂时屏蔽一个 channel,比如:

    
    var done <-chan struct{} // 初始值是 nil
    
    for {
      select {
        case <-done:
            return
        case <-input:
          // 代码逻辑
          if canReturn {
            done = ctx.Done()
          } else {
            done = nil
          }
      }
    }
    

    代码中,虽然要被 context 来控制退出,但如果存在某些特殊状态,不允许被终止。就要用到 nil channel 了。

    总结

    非 buffer 的 channel 比 buffer channel 少了一次内存 copy。但非 buffer channel 工作起来基本就是相当于个互斥锁,会让 goroutine 无法并行,在多核机器上会导致程序的处理效率很差(即最大并发量很低,机器的 CPU 利用率低)。所以如果 channel 是一个常驻型的,直接make 一个大一点的 buffer channel 没关系。

    通过 close channel 做简单的广播通知,这个很常用,官方 context 库也是这么做的。并不一定要用 context 库去通知,一个非 buffer 的 chan struct 变量就够。

    channel 也是一种数据类型,即使没有 close 也是可以被 GC 的,没必要去做特殊的管理。

    相关文章

      网友评论

          本文标题:Go 语言的 Channel - 源码分析

          本文链接:https://www.haomeiwen.com/subject/dibtfqtx.html