channel

作者: 林海畅游 | 来源:发表于2021-11-19 18:30 被阅读0次

    1.使用场景

    (1) 消息传递、消息过滤
    (2) 信号广播
    (3) 事件订阅与广播
    (4) 请求、相应转发
    (5) 任务分发
    (6) 结果汇总
    (7) 并发控制
    (8) 同步与异步

    2. 三种状态

    1. nil : 未初始化,只进行了声明,或者手动赋值为nil
    2. active : 正常的,可读可写
    3. closed : 已关闭。channel 的值不是nil
    

    3. 三种操作

    1. 读  <- ch
    2. 写  ch <- 
    3. 关闭 close(ch)
    

    4. 9种情况

    image.png

    备注:对于nil的,有一个特殊场景:当nil的通道在select 的某个case中时,这个case会阻塞,但不会造成死锁

    5. 常用操作

    5.1 使用for range 读

    场景:当需要不断从channel读取数据时
    原理:使用for-range 读取channel,既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,从而造成读到数据为通道所存储的数据类型的零值
    用法

    for  x := range ch  {
        fmt.Println(x)
    }
    

    5.2 使用_, ok 判断channel是否关闭

    场景:读channel,但不确定channel是否关闭时
    原理: 读已关闭的channel会得到零值
    用法

    if v, ok := <- ch; ok {
        fmt.Println(v)
    }
    

    5.3 使用select 处理多个channel

    场景:需要对多个通道进行同时处理,但只处理最先发生的的channel时
    原理 : select 可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case 永远为阻塞,无论读写。普通情况下,对nil通道的写是要panic的
    用法

    func (h *Handler) handle(job *Job) {
        select {
        case h.jobCh <- job:
            return 
        case <- h.stopCh:
           return
        }
    }
    

    5.4 使用channel的声明控制读写权限

    权限
    (1)读写均可的channel c : go func(c chan int)
    (2)只读的channel c : go func(c <- chan int)
    (3)只写的channel c: go func (c chan <- int)

    5.5 使用缓冲增强并发

    场景: 并发
    原理: 有缓冲通道可供多个协程同时处理,在一定程度可提高并发性
    用法

    //无缓冲
    ch1 := make(chan int)
    ch2 := make(chan int 0)
    //有缓冲
    ch3 := make(chan int 1)
    func generator(n int) <- chan int {
        outCh := make(chan int)
        go func() {
            for i:= 0; i < n; i++ {
                outCh <- i
            }
            close(outCh)
        }()
        return outCh
    }
    func do (inCh <- chan int, outCh chan <- int, wg *sync.WaitGroup) {
        for v := range inCh {
            outCh <- v * v
        }
        wg.Done()
    }
    
    func test() {
        inCh := generator(100)
        outCh := make(chan int, 10)
        var wg sync.WaitGroup
        wg.Add(5)
        for i := 0; i < 5; i++ {
            go do(inCh, outCh, &wg)
        }
        go func() {
            wg.Wait()
            close(outCh)
        }()
        for r:= range outCh {
            fmt.Println(r)
        }
    }
    

    5.6 为操作加上超时

    场景 : 需要超时控制操作
    原理 : 使用select 和tiem.After, 看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果
    用法

    func doWithTimeOut(timeout time.Duration) (int, error) {
        select {
        case ret := <-do():
            return ret, nil
        case <-time.After(timeout):
            return 0, errors.New("timeout")
        }
    }
    
    func do() <-chan int {
        outCh := make(chan int)
        go func() {
            // do work
        }()
        return outCh
    }
    

    5.7 使用time实现channel无阻塞读写

    场景: 并不希望在channel的读写上浪费时间
    原理: 是为操作加上超时的扩展,这里的操作是channel的读或写
    用法

    func unBlockRead(ch chan int) (x int, err error) {
        select {
        case x = <-ch:
            return x, nil
        case <-time.After(time.Microsecond):
            return 0, errors.New("read time out")
        }
    }
    
    func unBlockWrite(ch chan int, x int) (err error) {
        select {
        case ch <- x:
            return nil
        case <-time.After(time.Microsecond):
            return errors.New("read time out")
        }
    }
    

    5.8 使用close(ch)关闭所有下游协程

    场景 : 退出时,显示通知所有协程退出
    原理 : 所有读ch的协程都会收到close(ch)的信号
    用法

    func (h *Handler) Stop() {
        close(h.stopCh)
    
        // 可以使用WaitGroup等待所有协程退出
    }
    
    // 收到停止后,不再处理请求
    func (h *Handler) loop() error {
        for {
            select {
            case req := <-h.reqCh:
                go handle(req)
            case <-h.stopCh:
                return
            }
        }
    }
    

    5.9 使用chan struct {} 作为信号channel

    场景 : 使用channel传递信号,而不是传递数据时
    原理 : 没数据需要传递时,传递空struct
    用法

    // 上例中的Handler.stopCh就是一个例子,stopCh并不需要传递任何数据
    // 只是要给所有协程发送退出的信号
    type Handler struct {
        stopCh chan struct{}
        reqCh chan *Request
    }
    

    5.10 使用channel传递结构体的指针而非结构体

    场景 : 使用channel传递结构体数据时
    原理 : channel本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效
    用法

    reqCh chan *Request
    
    // 好过
    reqCh chan Request
    

    5. 11 使用channel 传递channel

    场景 : 使用场景有点多,通常是用来获取结果。
    原理 :channel可以用来传递变量,channel自身也是变量,可以传递自己。
    用法

    package main
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    func main() {
        reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
    
        // 存放结果的channel的channel
        outs := make(chan chan int, len(reqs))
        var wg sync.WaitGroup
        wg.Add(len(reqs))
        for _, x := range reqs {
            o := handle(&wg, x)
            outs <- o
        }
    
        go func() {
            wg.Wait()
            close(outs)
        }()
    
        // 读取结果,结果有序
        for o := range outs {
            fmt.Println(<-o)
        }
    }
    
    // handle 处理请求,耗时随机模拟
    func handle(wg *sync.WaitGroup, a int) chan int {
        out := make(chan int)
        go func() {
            time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
            out <- a
            wg.Done()
        }()
        return out
    }
    

    相关文章

      网友评论

          本文标题:channel

          本文链接:https://www.haomeiwen.com/subject/cziqzltx.html