美文网首页
Go并发 - channel & sync.mutex

Go并发 - channel & sync.mutex

作者: markfork | 来源:发表于2020-12-14 11:35 被阅读0次

    大纲

    • 什么是channel
    • 什么是mutex
    • 使用channel、mutex 实现安全的并发减操作
    • 使用channel 实现批量任务处理
    • channel 的本质
    • 使用 channel or 并发原语的时机
    • 参考资料

    1 什么是 channel

    通过学习,总结 channel 的定义 及特性 :

    1. channel 是控制并发的高级语法 or 数据结构,是一种更高层次的并发控制模型(其实内部封装了共享内存 + 互斥锁)
    2. channel 是线程安全的,channel 内的信息同一时刻只能被1个 goroutine 获取(消费), send()  不会发生多 goroutine 同时操作覆盖的情况
    3. channel 底层用循环链表存储消息,具有 FIFO 的特性
    4. channel 能影响 goroutine 之间的阻塞与唤醒,这是在 channel.go 中实现的,
    不需要显式调用 wait or notify 实现。
    5. channel 适合在 消息的同步传递,数据流转场景使用;
    6. channel  用来 解耦 两端的 goroutine 
    

    channel 在 go 中 的编程语法:

    blockCh := make(chan int)
    bufferCh := make(chan int , [cap])
    sendCh := make(chan<- int)
    reveiveCh := make(<-chan int) 
    

    2 什么是mutex

    1.  mutex 是比 channel 更低层次的并发控制原语,在 多 goroutine 并发修改临界区(共享变量)资源时,提供一种安全保障机制;
    2. sync 包是 go 中控制并发的配角,但并不代表它不重要,在该使用的时候还得使用;
    

    mutex 在go 中语法:

    var mtx  sync.Mutex
    var shareVal  int
    go func() {
        mtx.Lock()
        shareVal -- 
        mtx.Unlock()
    }()
    

    3. 使用channel、mutex 实现安全的并发减操作

    Do not communicate by sharing memory; instead, share memory by communicating.

    如何理解这句话?

    假设现在有个需求: n 个协程对一共享变量 shareVal 进行并发减操作,实现快速、安全的并发减操作 (shareVal = shareVal - n);

    3.1 使用 channel 实现

    分析:

    • 安全性: 不通协程需要对 共享变量 shareVal 做 -1 操作, 且操作是安全的,即 当前协程在 做 -1操作时, 共享变量 shareVal 只对其可见,其余协程均处于block 状态。
    • 可见性: 某协程操作完后,下一个协程操作时,拿到的共享变量必须是前一个 -1 后的值。
    • 数据流动性: 通过将 shareVal 放在 channel 传递,实现 shareVal 在不同 goroutine 之间的共享。
    func main() {
        ch := make(chan int, 1)
        var shareVal = 10000
        var i = 0
        ch <- shareVal
        var wg sync.WaitGroup
    
        for i < 10000 {
            wg.Add(1)
            go func() {
                defer wg.Done()
                val := <-ch
                val--
                ch <- val
            }()
            i++
        }
    
        wg.Wait()
        fmt.Printf("shareVal | %d", <-ch)
    }
    

    3.2 使用 mutex 实现 - 通过共享内存来实现通信

    func main() {
        var i = 0
        var shareVal = 10000
        var wg sync.WaitGroup
    
        var mtx sync.Mutex
        for i < 10000 {
            wg.Add(1)
            go func() {
                defer wg.Done()
                mtx.Lock()
                shareVal--
                mtx.Unlock()
            }()
            i++
        }
        wg.Wait()
        fmt.Printf("shareVal | %d", shareVal)
    }
    

    4 使用channel 实现批量任务处理

    假设现在有一个场景: 客户端发送大量长事务请求操作数据库(IO 密集型任务,短时间内不能返回数据),服务器核心数为8, 如何充分利用服务器资源(核心)实现批量处理的需求?

    这种问题使用信号量机制可以解决:

    func getData(idx int) {
        //time.Sleep(10 * time.Second)
        fmt.Printf("handler task_%d over\n", idx)
    }
    
    func main() {
        // semaphore 机制实现 任务批量处理
        var processNum = 8
        // 初始化信号量池  processNum
        sephCh := make(chan int, processNum)
        for i := 0; i < 8; i++ {
            sephCh <- 1
        }
    
        var wg sync.WaitGroup
        // 模拟多任务
        for j := 0; j < 10000; j++ {
            wg.Add(1)
            // 开启子协程处理任务之前先从 信号量池 中拿到 许可证
            <-sephCh
            fmt.Printf("run task %d \n", j)
            go func(idx int) {
                defer wg.Done()
                getData(idx)
                // 处理完后归还许可证,供 blocking-> ready 状态的协程 使用 
                sephCh <- 1
            }(j)
        }
    
        wg.Wait()
    
        close(sephCh)
        
        for s := range sephCh {
            fmt.Printf("%d\n", s)
        }
    
        fmt.Printf("all task over!")
    }
    

    4 channel 的本质

    动图描述

    4.1 channel - 底层数据结构

    channel 底层数据结构

    详细解释:

    • buf 是有缓冲的channel所特有的结构,用来存储缓存数据,是循环链表;
    • sendx和recvx用于记录buf这个循环链表中的index;
    • lock是互斥锁,控制 channel 的 接收与取出操作;
    • recvq和sendq分别是接收( <-channel )或者发送(channel <- val) 的 goroutine 抽象出来的结构体(sudog),是双向链表;

    4.2 send or receive 底层操作

    以如下缓冲channel 为例:

    ch := make(chan int, 3)
    

    4.2.1 send 操作

    执行 send 操作, 每一步的操作的细节可以细化为:

    第一,加锁
    第二,把数据从goroutine中copy到“队列”中(或者从队列中copy到goroutine中)。
    第三,释放锁
    

    每一步的操作总结为动态图为:(发送过程)


    chan<- 底层

    4.2.2 receive 操作

    <-chan 底层

    4.3 buffer channel 满与空 情况下的 goroutine send or receive 过程

    4.3.1 send val to full channel

    当前场景,channel 已满,groutine 还在并发往 channel 中塞数据:

    ch := make(chan int, 3)
    
    ch <- 1
    ch <- 1
    ch <- 1
    
    full channel

    这个时候G1正在正常运行,当再次进行send操作(ch<-1)的时候,会主动调用Go的调度器,让G1等待,并从让出M,让其他G去使用, 如下图所示:

    正在运行的G1 被强制 gopark
    即 的G1 被强制 gopark,从 running 状态变为 block 状态;

    同时G1也会被抽象成含有G1指针和send元素的sudog结构体保存到hchan的sendq中等待被唤醒。


    send gorutine 假如send q 阻塞队列,等待 channel 释放空间

    问题来了, G1何时被唤醒?这个时候G2隆重登场。


    G2 receive操作

    G2执行了recv操作p := <-ch,于是会发生以下的操作:


    取满操作后,阻塞协程的运行状态

    4.3.2 receive val from empty channel

    当前场景:

    ch := make(chan int, 3)
    <-ch
    <-ch
    <-ch
    // channel 已空, 继续执行 receive 操作
    <-ch
    

    抽象场景如下图所示:


    G2->Go 调度器->G2 block

    这个时候G2会主动调用Go的调度器,让G2等待,并从让出M,让其他G去使用。

    G2还会被抽象成含有G2指针和recv空元素的sudog结构体保存到hchan的recvq中等待被唤醒,如下图所示:


    G2 抽象为 point 放在 recvq 中

    此时恰好有个goroutine G1开始向channel中推送数据 ch <- 1, 比较特殊的情况(不用加锁)


    just only copy

    接下来,G2 被 Go schedule 唤醒,进入P 管理的 goroutine 就绪队列,等待运行


    go schedule

    5 使用channel 的时机 or 使用 mutex 的时机

    • channel 关注数据流动,如果任务处理模型中存在数据流动,使用channel 解决;
    • 数据的控制权需要在多个gorutine 中传递, 使用 channel 处理;

    6 参考资料

    相关文章

      网友评论

          本文标题:Go并发 - channel & sync.mutex

          本文链接:https://www.haomeiwen.com/subject/wtmpgktx.html