本文主要讲解Go并发编程之Select
目录
- 介绍
- 基础语法
- timeout
- 综合实例
select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收,它的case涉及到channel有关的I/O操作。
或者换一种说法,select就是用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。
Go 语言的 select 语句借鉴自 Unix 的 select() 函数,在 Unix 中,可以通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 动作,该 select() 调用就会被返回(C 语言中就是这么做的),后来该机制也被用于实现高并发的 Socket 服务器程序。Go 语言直接在语言级别支持 select关键字,用于处理并发编程中通道之间异步 IO 通信问题。
select基础语法
select {
case <-ch1:
// 如果从 ch1 信道成功接收数据,则执行该分支代码
case ch2 <- 1:
// 如果成功向 ch2 信道成功发送数据,则执行该分支代码
default:
// 如果上面都没有成功,则进入 default 分支处理流程,有了default非阻塞式
}
- select语句只能用于信道的读写操作
- select中的case条件(非阻塞)是并发执行的,select会选择先操作成功的那个case条件去执行。如果多个同时返回,则随机选择一个执行,此时将无法保证执行顺序。
- case条件语句中如果存在信道值为nil的读写操作,则该分支将被忽略。
- 如果有超时条件语句,判断逻辑为如果在这个时间段内一直没有满足条件的case,则执行这个超时case。一般用超时语句代替了default语句
- 对于空的select{},会引起死锁
- 对于for中的select{}, 也有可能会引起cpu占用过高的问题
timeout
select有很重要的一个应用就是超时处理。 如果没有case需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。
import "time"
import "fmt"
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}
Timer和Ticker
Timer
timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个Channel,在将来的那个时间那个Channel提供了一个时间值。下面的例子中第二行会阻塞2秒钟左右的时间,直到时间到了才会继续执行。
timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")
当然如果你只是想单纯的等待的话,可以使用time.Sleep来实现。
还可以使用timer.Stop来停止计时器。
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
ticker
ticker是一个定时触发的计时器,它会以一个间隔(interval)往Channel发送一个事件(当前时间),而Channel的接收者可以以固定的时间间隔从Channel中读取事件。下面的例子中ticker每500毫秒触发一次,你可以观察输出的时间。
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}()
类似timer, ticker也可以通过Stop方法来停止。一旦它停止,接收者不再会从channel中接收数据了。
综合例子
从该例子中可以学到
- select的使用:有了default代表非阻塞模式
- 定时器的使用
- 在select中使用了Nil channel
package main
import (
"fmt"
"math/rand"
"time"
)
func worker(id int,c chan int) {
for n := range c { //待到c 被close时,退出
time.Sleep(time.Second * 1) //收到的数据消费太慢了,会被冲掉。
fmt.Printf("worker %d receiver %d \n", id, n)
}
}
func createWorker(id int) chan int {
c := make(chan int)
//处理
go worker(id,c)
return c
}
func generator() chan int{
out := make(chan int)
go func() {
i := 0
for{
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i ++
}
}()
return out
}
func main() {
//channel内部收发数据为阻塞式的
//如果要实现非阻塞式的,用select+default
var c1, c2 = generator(),generator() //c1 与 c2 nil vs nil
var worker = createWorker(0) //nil channel
n:=0
var values []int
tm := time.After(10 * time.Second)
tick := time.Tick(time.Second)
for {
var activeValue int
var activeWorker chan int
if len(values) > 0 {
activeWorker = worker
activeValue = values[0]
}
select {
case n = <-c1: //从c1成功接收数据
values = append(values,n)
case n = <-c2: //从c2成功接收数据
values = append(values,n)
case activeWorker <- activeValue:
values = values[1:]
case <-time.After(800*time.Millisecond): //在80秒中未送入数据,打印timeout。
fmt.Println("timeout")
case <- tick: //每隔1秒,打印队列中的长度
fmt.Println("queue len = ",len(values))
case <-tm:
fmt.Println("Bye") //总时长超过定义的时间,程序退出
return
}
}
}
网友评论