- channel 是干什么的
意义:channel 是用来通信的
实际上:(数据拷贝了一份,并通过 channel 传递,本质就是个队列)
channel 应该用在什么地方
核心:需要通信的地方
例如以下场景:
通知广播
交换数据
显式同步
并发控制
...
超时控制
// 利用 time.After 实现
func main() {
done := do()
select {
case <-done:
// logic
case <-time.After(3 * time.Second):
// timeout
}
}
func do() <-chan struct{} {
done := make(chan struct{}, 1)
go func() {
// do something
// ...
done <- struct{}{}
}()
return done
}
取最快的结果
比较常见的一个场景是重试,第一个请求在指定超时时间内没有返回结果,这时重试第二次,取两次中最快返回的结果使用。
超时控制在上面有,下面代码部分就简单实现调用多次了。
func main() {
ret := make(chan string, 3)
for i := 0; i < cap(ret); i++ {
go call(ret)
}
fmt.Println(<-ret)
}
func call(ret chan<- string) {
// do something
// ...
ret <- "result"
}
限制最大并发数
// 最大并发数为 2
limits := make(chan struct{}, 2)
for i := 0; i < 10; i++ {
go func() {
// 缓冲区满了就会阻塞在这
limits <- struct{}{}
do()
<-limits
}()
}
for...range 优先
for ... range c { do } 这种写法相当于 if _, ok := <-c; ok { do }
func main() {
c := make(chan int, 20)
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()
// 当 c 被关闭后,取完里面的元素就会跳出循环
for x := range c {
fmt.Println(x)
}
}
多个 goroutine 同步响应,利用 close 广播
func main() {
c := make(chan struct{})
for i := 0; i < 5; i++ {
go do(c)
}
close(c)
}
func do(c <-chan struct{}) {
// 会阻塞直到收到 close
<-c
fmt.Println("hello")
}
非阻塞的 select
select 本身是阻塞的,当所有分支都不满足就会一直阻塞,如果想不阻塞,那么一个什么都不干的 default 分支是最好的选择
select {
case <-done:
return
default:
}
for{select{}} 终止
尽量不要用 break label 形式,而是把终止循环的条件放到 for 条件里来实现
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
并发控制
假设有一组任务需要异步处理且量很大,那我们需要同时开启多个 worker 以保证任务的处理速度而不会堵塞任务。其他语言,可能会需要开启多进程来完成,多进程的控制、IO 消耗等会是个需要注意的问题,而这些 Go 都能帮我们很轻易的解决。
大致的实现要点和流程:
创建2个信道,messages 用于传送任务消息,result 用于接收消息处理结果
创建3个 Worker 协程,用于接收和处理来自 messages 信道的任务消息,并将处理结果通过信道 result 返回
通过信道 messages 发布10条任务
通过信道 result 接收任务处理结果
示例代码:
package main
import (
"fmt"
"strconv"
"math/rand"
"time"
)
type Message struct {
Id int
Name string
}
func main() {
messages := make(chan Message, 100)
result := make(chan error, 100)
// 创建任务处理Worker
for i := 0; i < 3; i ++ {
go worker(i, messages, result)
}
total := 0
// 发布任务
for k := 1; k <= 10; k ++ {
messages <- Message{Id: k, Name: "job" + strconv.Itoa(k)}
total += 1
}
close(messages)
// 接收任务处理结果
for j := 1; j <= total; j ++ {
res := <-result
if res != nil {
fmt.Println(res.Error())
}
}
close(result)
}
func worker(worker int, msg <-chan Message, result chan<- error) {
// 从通道 chan Message 中监听&接收新的任务
for job := range msg {
fmt.Println("worker:", worker, "msg: ", job.Id, ":", job.Name)
// 模拟任务执行时间
time.Sleep(time.Second * time.Duration(RandInt(1, 3)))
// 通过通道返回执行结果
result <- nil
}
}
func RandInt(min, max int) int {
rand.Seed(time.Now().UnixNano())
return min + rand.Intn(max-min+1)
}
网友评论