美文网首页
channel是干什么的

channel是干什么的

作者: 将军红 | 来源:发表于2019-12-19 16:39 被阅读0次
  1. channel 源码地址

  2. Channel 应用场景实践 参考链接

  • channel 是干什么的

意义:channel 是用来通信的
实际上:(数据拷贝了一份,并通过 channel 传递,本质就是个队列)

channel 应该用在什么地方
核心:需要通信的地方
例如以下场景:

通知广播
交换数据
显式同步
并发控制
...

超时控制

// 利用 time.After 实现
func main() {
    done := do()
    select {
    case <-done:
        // logic
    case <-time.After(3 * time.Second):
        // timeout
    }
}

func do() <-chan struct{} {
    done := make(chan struct{}, 1)
    go func() {
        // do something
        // ...
        done <- struct{}{}
    }()
    return done
}

取最快的结果

比较常见的一个场景是重试,第一个请求在指定超时时间内没有返回结果,这时重试第二次,取两次中最快返回的结果使用。
超时控制在上面有,下面代码部分就简单实现调用多次了。

func main() {
    ret := make(chan string, 3)
    for i := 0; i < cap(ret); i++ {
        go call(ret)
    }
        fmt.Println(<-ret)
}

func call(ret chan<- string) {
    // do something
    // ...
    ret <- "result"
}

限制最大并发数

// 最大并发数为 2
limits := make(chan struct{}, 2)
for i := 0; i < 10; i++ {
    go func() {
        // 缓冲区满了就会阻塞在这
        limits <- struct{}{}
        do()
        <-limits
    }()
}

for...range 优先

for ... range c { do } 这种写法相当于 if _, ok := <-c; ok { do }

func main() {
    c := make(chan int, 20)
    go func() {
        for i := 0; i < 10; i++ {
            c <- i
        }
        close(c)
    }()
    // 当 c 被关闭后,取完里面的元素就会跳出循环
    for x := range c {
        fmt.Println(x)
    }
}

多个 goroutine 同步响应,利用 close 广播

func main() {
    c := make(chan struct{})
    for i := 0; i < 5; i++ {
        go do(c)
    }
    close(c)
}

func do(c <-chan struct{}) {
    // 会阻塞直到收到 close
    <-c
    fmt.Println("hello")
}

非阻塞的 select
select 本身是阻塞的,当所有分支都不满足就会一直阻塞,如果想不阻塞,那么一个什么都不干的 default 分支是最好的选择

select {
case <-done:
    return
default:
}

for{select{}} 终止
尽量不要用 break label 形式,而是把终止循环的条件放到 for 条件里来实现

for ok {
    select {
    case ch <- 0:
    case <-done:
        ok = false
    }
}

并发控制

假设有一组任务需要异步处理且量很大,那我们需要同时开启多个 worker 以保证任务的处理速度而不会堵塞任务。其他语言,可能会需要开启多进程来完成,多进程的控制、IO 消耗等会是个需要注意的问题,而这些 Go 都能帮我们很轻易的解决。

大致的实现要点和流程:

创建2个信道,messages 用于传送任务消息,result 用于接收消息处理结果
创建3个 Worker 协程,用于接收和处理来自 messages 信道的任务消息,并将处理结果通过信道 result 返回
通过信道 messages 发布10条任务
通过信道 result 接收任务处理结果

示例代码:

package main

import (
    "fmt"
    "strconv"
    "math/rand"
    "time"
)

type Message struct {
    Id   int
    Name string
}

func main() {
    messages := make(chan Message, 100)
    result := make(chan error, 100)

    // 创建任务处理Worker
    for i := 0; i < 3; i ++ {
        go worker(i, messages, result)
    }

    total := 0
    // 发布任务
    for k := 1; k <= 10; k ++ {
        messages <- Message{Id: k, Name: "job" + strconv.Itoa(k)}
        total += 1
    }

    close(messages)

    // 接收任务处理结果
    for j := 1; j <= total; j ++ {
        res := <-result
        if res != nil {
            fmt.Println(res.Error())
        }
    }

    close(result)
}

func worker(worker int, msg <-chan Message, result chan<- error) {
    // 从通道 chan Message 中监听&接收新的任务
    for job := range msg {
        fmt.Println("worker:", worker, "msg: ", job.Id, ":", job.Name)

        // 模拟任务执行时间
        time.Sleep(time.Second * time.Duration(RandInt(1, 3)))

        // 通过通道返回执行结果
        result <- nil
    }
}

func RandInt(min, max int) int {
    rand.Seed(time.Now().UnixNano())
    return min + rand.Intn(max-min+1)
}

相关文章

网友评论

      本文标题:channel是干什么的

      本文链接:https://www.haomeiwen.com/subject/mcxvnctx.html