美文网首页
channel是干什么的

channel是干什么的

作者: 将军红 | 来源:发表于2019-12-19 16:39 被阅读0次
    1. channel 源码地址

    2. Channel 应用场景实践 参考链接

    • channel 是干什么的

    意义:channel 是用来通信的
    实际上:(数据拷贝了一份,并通过 channel 传递,本质就是个队列)

    channel 应该用在什么地方
    核心:需要通信的地方
    例如以下场景:

    通知广播
    交换数据
    显式同步
    并发控制
    ...
    

    超时控制

    // 利用 time.After 实现
    func main() {
        done := do()
        select {
        case <-done:
            // logic
        case <-time.After(3 * time.Second):
            // timeout
        }
    }
    
    func do() <-chan struct{} {
        done := make(chan struct{}, 1)
        go func() {
            // do something
            // ...
            done <- struct{}{}
        }()
        return done
    }
    

    取最快的结果

    比较常见的一个场景是重试,第一个请求在指定超时时间内没有返回结果,这时重试第二次,取两次中最快返回的结果使用。
    超时控制在上面有,下面代码部分就简单实现调用多次了。

    func main() {
        ret := make(chan string, 3)
        for i := 0; i < cap(ret); i++ {
            go call(ret)
        }
            fmt.Println(<-ret)
    }
    
    func call(ret chan<- string) {
        // do something
        // ...
        ret <- "result"
    }
    

    限制最大并发数

    // 最大并发数为 2
    limits := make(chan struct{}, 2)
    for i := 0; i < 10; i++ {
        go func() {
            // 缓冲区满了就会阻塞在这
            limits <- struct{}{}
            do()
            <-limits
        }()
    }
    

    for...range 优先

    for ... range c { do } 这种写法相当于 if _, ok := <-c; ok { do }

    func main() {
        c := make(chan int, 20)
        go func() {
            for i := 0; i < 10; i++ {
                c <- i
            }
            close(c)
        }()
        // 当 c 被关闭后,取完里面的元素就会跳出循环
        for x := range c {
            fmt.Println(x)
        }
    }
    

    多个 goroutine 同步响应,利用 close 广播

    func main() {
        c := make(chan struct{})
        for i := 0; i < 5; i++ {
            go do(c)
        }
        close(c)
    }
    
    func do(c <-chan struct{}) {
        // 会阻塞直到收到 close
        <-c
        fmt.Println("hello")
    }
    

    非阻塞的 select
    select 本身是阻塞的,当所有分支都不满足就会一直阻塞,如果想不阻塞,那么一个什么都不干的 default 分支是最好的选择

    select {
    case <-done:
        return
    default:
    }
    

    for{select{}} 终止
    尽量不要用 break label 形式,而是把终止循环的条件放到 for 条件里来实现

    for ok {
        select {
        case ch <- 0:
        case <-done:
            ok = false
        }
    }
    

    并发控制

    假设有一组任务需要异步处理且量很大,那我们需要同时开启多个 worker 以保证任务的处理速度而不会堵塞任务。其他语言,可能会需要开启多进程来完成,多进程的控制、IO 消耗等会是个需要注意的问题,而这些 Go 都能帮我们很轻易的解决。

    大致的实现要点和流程:

    创建2个信道,messages 用于传送任务消息,result 用于接收消息处理结果
    创建3个 Worker 协程,用于接收和处理来自 messages 信道的任务消息,并将处理结果通过信道 result 返回
    通过信道 messages 发布10条任务
    通过信道 result 接收任务处理结果
    

    示例代码:

    package main
    
    import (
        "fmt"
        "strconv"
        "math/rand"
        "time"
    )
    
    type Message struct {
        Id   int
        Name string
    }
    
    func main() {
        messages := make(chan Message, 100)
        result := make(chan error, 100)
    
        // 创建任务处理Worker
        for i := 0; i < 3; i ++ {
            go worker(i, messages, result)
        }
    
        total := 0
        // 发布任务
        for k := 1; k <= 10; k ++ {
            messages <- Message{Id: k, Name: "job" + strconv.Itoa(k)}
            total += 1
        }
    
        close(messages)
    
        // 接收任务处理结果
        for j := 1; j <= total; j ++ {
            res := <-result
            if res != nil {
                fmt.Println(res.Error())
            }
        }
    
        close(result)
    }
    
    func worker(worker int, msg <-chan Message, result chan<- error) {
        // 从通道 chan Message 中监听&接收新的任务
        for job := range msg {
            fmt.Println("worker:", worker, "msg: ", job.Id, ":", job.Name)
    
            // 模拟任务执行时间
            time.Sleep(time.Second * time.Duration(RandInt(1, 3)))
    
            // 通过通道返回执行结果
            result <- nil
        }
    }
    
    func RandInt(min, max int) int {
        rand.Seed(time.Now().UnixNano())
        return min + rand.Intn(max-min+1)
    }
    

    相关文章

      网友评论

          本文标题:channel是干什么的

          本文链接:https://www.haomeiwen.com/subject/mcxvnctx.html