美文网首页
go的chan中的锁

go的chan中的锁

作者: 柯基是只dog | 来源:发表于2018-12-28 18:31 被阅读0次

    chan是go中csp的关键,网上一直说chan性能一般,因为用到了锁,我就找来源码研究了一下

    我们先看chan对象结构,发现有一个lock的锁字段

    type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters
    
        // lock protects all fields in hchan, as well as several
        // fields in sudogs blocked on this channel.
        //
        // Do not change another G's status while holding this lock
        // (in particular, do not ready a G), as this can deadlock
        // with stack shrinking.
        lock mutex
    }
    

    那我们直接看发送和接收方法中用到锁了吗

    // 发送
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        if c == nil {
            if !block {
                return false
            }
            gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        if debugChan {
            print("chansend: chan=", c, "\n")
        }
    
        if raceenabled {
            racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
        }
        if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        lock(&c.lock)
    
    ...
    // 关闭
    func closechan(c *hchan) {
        if c == nil {
            panic(plainError("close of nil channel"))
        }
    
        lock(&c.lock)
    
    ...
    // 接收:
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        // raceenabled: don't need to check ep, as it is always on the stack
        // or is new memory allocated by reflect.
    
        if debugChan {
            print("chanrecv: chan=", c, "\n")
        }
    
        if c == nil {
            if !block {
                return
            }
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        lock(&c.lock)
    
    func lock(l *mutex) {
        gp := getg()
    
        if gp.m.locks < 0 {
            throw("runtime·lock: lock count")
        }
        gp.m.locks++
    
        // Speculative grab for lock.
        v := atomic.Xchg(key32(&l.key), mutex_locked)
        if v == mutex_unlocked {
            return
        }
    
        // wait is either MUTEX_LOCKED or MUTEX_SLEEPING
        // depending on whether there is a thread sleeping
        // on this mutex. If we ever change l->key from
        // MUTEX_SLEEPING to some other value, we must be
        // careful to change it back to MUTEX_SLEEPING before
        // returning, to ensure that the sleeping thread gets
        // its wakeup call.
        wait := v
    
        // On uniprocessors, no point spinning.
        // On multiprocessors, spin for ACTIVE_SPIN attempts.
        spin := 0
        if ncpu > 1 {
            spin = active_spin
        }
        for {
            // Try for lock, spinning.
            for i := 0; i < spin; i++ {
                for l.key == mutex_unlocked {
                    if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
                        return
                    }
                }
                procyield(active_spin_cnt)
            }
    
            // Try for lock, rescheduling.
            for i := 0; i < passive_spin; i++ {
                for l.key == mutex_unlocked {
                    if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
                        return
                    }
                }
                osyield()
            }
    
            // Sleep.
            v = atomic.Xchg(key32(&l.key), mutex_sleeping)
            if v == mutex_unlocked {
                return
            }
            wait = mutex_sleeping
            futexsleep(key32(&l.key), mutex_sleeping, -1)
        }
    }
    

    嗯~确实每次操作前都尝试加锁了,chan的逻辑其实不复杂,分为阻塞模式和带缓冲区模式两种,堵塞模式每次读写都判断对方队列是否有等待的g,如果没有的话就把自己当前g打包加入到对应的等待队列然后休眠等待对方有g来唤醒。
    带缓冲区就是多了个缓冲区,每次发送或接收都对缓冲区操作,写的时候缓冲区满和读的时候缓冲区空都打包自己休眠等待。

    对了,差点忘了select,select是chan的多路复用,能监听多个chan,select同样加锁了,而且还是对所有的chan都加锁- -!,加完锁才判断所有的chan是否可以操作,如果都不能,则把当前g循环假如所有的chan的对应队列中,这样看来,以前业务还真有蛮多协程监听多个chan的场景,而且是死循环等待,等于每次都会给所有chan加一遍锁,高并发下还是蛮影响效率的。。

    func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
        if debugSelect {
            print("select: cas0=", cas0, "\n")
        }
    
        cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
        order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    
        scases := cas1[:ncases:ncases]
        pollorder := order1[:ncases:ncases]
        lockorder := order1[ncases:][:ncases:ncases]
    
        // Replace send/receive cases involving nil channels with
        // caseNil so logic below can assume non-nil channel.
        for i := range scases {
            cas := &scases[i]
            if cas.c == nil && cas.kind != caseDefault {
                *cas = scase{}
            }
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
            for i := 0; i < ncases; i++ {
                scases[i].releasetime = -1
            }
        }
    
        // The compiler rewrites selects that statically have
        // only 0 or 1 cases plus default into simpler constructs.
        // The only way we can end up with such small sel.ncase
        // values here is for a larger select in which most channels
        // have been nilled out. The general code handles those
        // cases correctly, and they are rare enough not to bother
        // optimizing (and needing to test).
    
        // generate permuted order
        for i := 1; i < ncases; i++ {
            j := fastrandn(uint32(i + 1))
            pollorder[i] = pollorder[j]
            pollorder[j] = uint16(i)
        }
    
        // sort the cases by Hchan address to get the locking order.
        // simple heap sort, to guarantee n log n time and constant stack footprint.
        for i := 0; i < ncases; i++ {
            j := i
            // Start with the pollorder to permute cases on the same channel.
            c := scases[pollorder[i]].c
            for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
                k := (j - 1) / 2
                lockorder[j] = lockorder[k]
                j = k
            }
            lockorder[j] = pollorder[i]
        }
        for i := ncases - 1; i >= 0; i-- {
            o := lockorder[i]
            c := scases[o].c
            lockorder[i] = lockorder[0]
            j := 0
            for {
                k := j*2 + 1
                if k >= i {
                    break
                }
                if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
                    k++
                }
                if c.sortkey() < scases[lockorder[k]].c.sortkey() {
                    lockorder[j] = lockorder[k]
                    j = k
                    continue
                }
                break
            }
            lockorder[j] = o
        }
    
        if debugSelect {
            for i := 0; i+1 < ncases; i++ {
                if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
                    print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
                    throw("select: broken sort")
                }
            }
        }
    
        // 加锁了,而且下面还有一处。。
        sellock(scases, lockorder)
    
    func sellock(scases []scase, lockorder []uint16) {
        var c *hchan
        for _, o := range lockorder {
            c0 := scases[o].c
            if c0 != nil && c0 != c {
                c = c0
                lock(&c.lock)
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:go的chan中的锁

          本文链接:https://www.haomeiwen.com/subject/iyuelqtx.html