美文网首页
sync包 waitgroup源码阅读

sync包 waitgroup源码阅读

作者: one_zheng | 来源:发表于2019-05-29 19:24 被阅读0次

    借鉴于Go夜读,加了个人理解:https://reading.developerlearning.cn/articles/sync/sync_waitgroup_source_code_analysis/

    go版本:go1.12 windows/amd64

    结构体

    // A WaitGroup must not be copied after first use.
    type WaitGroup struct {
        noCopy noCopy // 标记不可复制,使用go vet作为检测使用,并因此只能进行指针传递,从而保证全局唯一
        // 64位值:高32位是计数器,低32位是goroutine等待计数
        // 不使用64位值是因为32位的编译器不能确保64位原子操作的位对齐 
        // 一个uit32占4个byte,我们分配了12个byte对齐的8个byte作为状态,剩下4个用于信号量的存储
        state1 [3]uint32 // 前8个元素代表状态。转为2进制:0000 0000,0000 0000... ...0000 0000 后4个元素代表信号量
    }
    

    函数

    以下代码已经去掉了与核心代码无关的race代码。

    state

    // state returns pointers to the state and sema fields stored within wg.state1.
    // 根据结构体中初始化分配的 12bytes 来兼容处理 64位操作系统和 32位操作系统,
    // 具体原理是,12bytes 中必定含有一个8bytes,仅仅使用这个含有的8bytes做为数据对齐使用,具体:
    // 当指针位置刚好指在 (8byte) 的位置,证明位对齐,使用 前8bytes 作为状态计数;
    // 当指针位置不指在 (8byte) ,没对齐,抛弃前 4bytes,使用后8bytes作为位对齐,用于记录状态计数。
    func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
        if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {  //(Why?)
                   // 前8byte为状态             
            return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
        } else {
                   // 后8byte为状态
            return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
        }
    }
    

    有关内存对齐:https://www.e-learn.cn/content/qita/1020837

    Add

    添加或者减少等待goroutine的数量。

    参数delta可能是负的,加到WaitGroup计数器,可能出现如下结果 - 如果计数器变为零,所有被阻塞的goroutines都会被释放。 - 如果计数器变成负数,就panic。

    
    func (wg *WaitGroup) Add(delta int) {
        // 获得wg.state1数组中元素组成的二进制对应的十进制的值
        statep, semap := wg.state()
        // 高32位是计数器,低32位是goroutine等待计数
        // 原子操作,如初始状态 statep 为空,且 delta 等于 1, 操作 加 1:
        // 00000000 00000000 00000000 00000001 00000000 …… 00000000
        // \___________ 前32位 _______________/\__ 后32位均为0 __/
        // 若当前状态位存在值 1,则再添加 delta 等于 1, 其结果为:
        // 00000000 00000000 00000000 00000010 00000000 …… 00000000
        // \___________ 前32位 _______________/\__ 后32位均为0 __/  
        state := atomic.AddUint64(statep, uint64(delta)<<32)
          // 获取计数器
        v := int32(state >> 32)
        w := uint32(state)
           // 计数器为负数,报panic
        if v < 0 {
            panic("sync: negative WaitGroup counter")
        }
        if w != 0 && delta > 0 && v == int32(delta) {
            panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
        if v > 0 || w == 0 {  // w == 0 代表无等待的goroutine
            return
        }
        // Done()函数调用,v==0,唤醒线程
        // 不能出现等待计数器>0,但goroutine已经设置为0的情况:
        // 1.Wait()跟Add()同时发生;
        // 2.如果计数器已经为0了,Wait()不能再增加等待计数器了 
        if *statep != state {
            panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
        // Reset waiters count to 0.
        *statep = 0
        for ; w != 0; w-- {
            // 目的是作为一个简单的wakeup原语,以供同步使用。true为唤醒排在等待队列的第一个goroutine
            runtime_Semrelease(semap, false)
        }
    }
    

    Done

    相当于Add(-1)。

    func (wg *WaitGroup) Done() {
        // 计数器减一
        wg.Add(-1)
    }
    

    Wait

    // Wait blocks until the WaitGroup counter is zero.
    func (wg *WaitGroup) Wait() {
        statep, semap := wg.state()
           // cas算法
        for {
            state := atomic.LoadUint64(statep)
           // 高32位是计数器
            v := int32(state >> 32)
            w := uint32(state)
          // 计数器为0,结束等待
            if v == 0 {
                // Counter is 0, no need to wait.
                if race.Enabled {
                    race.Enable()
                    race.Acquire(unsafe.Pointer(wg))
                }
                return
            }
            // 增加等待goroutine计数,对低32位加1,不需要移位
            if atomic.CompareAndSwapUint64(statep, state, state+1) {    
             // 目的是作为一个简单的sleep原语,以供同步使用
                runtime_Semacquire(semap)
                if *statep != 0 {
                    panic("sync: WaitGroup is reused before previous Wait has returned")
                }
                return
            }
        }
    }
    

    使用注意事项

    1. WaitGroup不能保证多个 goroutine 执行次序
    2. WaitGroup无法指定固定的goroutine数目

    相关文章

      网友评论

          本文标题:sync包 waitgroup源码阅读

          本文链接:https://www.haomeiwen.com/subject/wbsyzqtx.html