一、多独立协程并发——worker分工模式
并发协程独立运行且互不通信,主协程等待处理独立子协程的结果
并发编程有一种常见方式就是许多工作子协程都是独立的,互不干扰,但他们又是“同一时间”处理。
例如http服务器的工作模式就是这种,在go标准包net/http中,http服务启动后,它所接收的每一个请求都是独立协程处理的,每个请求的运行协程之间都互不通信。
我们知道,go http服务的请求处理协程都是动态创建的,但很多情况下,我们是需要一些固定数量的协程去独立处理任务,所以这里我们先说固定数量协程独立运行的情况。
范式实现:
- 创建三个通道,分别处理“任务单元”、“任务结果”、“任务完成状态”三种通信数据;
- 启动新增任务协程,把任务单元数据发送给jobs通道;
- 各任务单元分别由独立的worker协程处理,执行独立任务,并把任务处理结果发送给results通道;
- 启动等待任务结果协程,从done管道接收任务处理完成的数据;
- 主协程显示处理结果。
我们知道go有个select原语,专门用来处理阻塞或非阻塞通道数据的,我们可以使用for...select...范式最大效率地不断从通道中读出数据。代码如下:
func Demo() {
run("读书", "看报", "撸代码")
}
type TaskJob struct {
task string
results chan<- Result
}
func (j *TaskJob) do() {
fmt.Println("Do Task:", j.task)
// Do Something...
j.results <- Result{j.task, 200, "Successful"}
}
type Result struct {
task string
code int
message string
}
// 逻辑核心数作为并发worker数
var workers = runtime.NumCPU()
func run(tasks ...string) {
jobs := make(chan TaskJob, workers) // 任务通道
results := make(chan Result, len(tasks)) // 执行结果输出通道
done := make(chan struct{}, workers) // 任务完成状态通道
// 子协程添加任务
go addJob(jobs, tasks, results)
// 使用单独的worker协程执行并发任务
for i := 0; i < workers; i++ {
go doJobs(done, jobs)
}
// 主协程等待并处理结果
waitAndProcessResults1(done, results)
}
// 添加任务
func addJob(jobs chan<- TaskJob, tasks []string, results chan<- Result) {
for _, task := range tasks {
jobs <- TaskJob{task: task, results: results}
}
// 因为此为发送端,新增完任务后在此关闭通道
close(jobs)
}
// 执行任务
func doJobs(done chan<- struct{}, jobs <-chan TaskJob) {
for job := range jobs {
job.do()
}
done <- struct{}{}
}
// 等待并处理子协程记过:for...select...范式同时处理results和done通道,合并awaitCompletion()和showResults()
func waitAndProcessResults1(done <-chan struct{}, results <-chan Result) () {
// 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done
for i := workers; i > 0; {
select {
case result := <-results:
fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
case <-done:
i--
}
}
// 【非阻塞】worker全部做完后:results全部清空后退出
DONE:
for {
select {
case result := <-results:
fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
default:
break DONE
}
}
}
执行结果:
=== RUN TestDemo31
Do Task: 撸代码
Do Task: 读书
Task Job :撸代码, Result:200,Successful
Task Job :读书, Result:200,Successful
Do Task: 看报
Task Job :看报, Result:200,Successful
--- PASS: TestDemo31 (0.00s)
PASS
可以看到,当有好几个不同的通道需要处理时,使用select原语是非常方便的。在这里我们集中处理results和done通道。
添加子协程超时处理:
当对worker任务子协程有超时要求时,也可在select中添加超时操作,对waitAndProcessResults()修改如下:
// 合并awaitCompletion()和showResults(),并增加超时处理
func waitAndProcessResults(timeout time.Duration, done <-chan struct{}, results <-chan Result) () {
// 超时通道
finish := time.After(time.Duration(timeout))
// 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done,要么超时
for i := workers; i > 0; {
select {
case result := <-results:
fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
case <-finish:
fmt.Println("worker任务执行超时!")
return
case <-done:
i--
}
}
// 【非阻塞】worker全部做完后:results全部清空后退出
for {
select {
case result := <-results:
fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
case <-finish:
fmt.Println("worker任务执行超时!")
return
default:
return
}
}
}
二、地铁闸机——限流模式
开启多条子协程, 往有限的通道发送/接收数据,这种情形类似地铁站内的地铁闸机模式,闸机就那么几个,行人分组从闸机通过,通常哪个闸机人少,人们倾向于排哪个闸门的队,或者哪个闸机通行效率高就排哪个闸门的队。
在这种并发模式中,协程相当于行人,通道相当于地铁闸门,多条通道从有限闸门中通过。
具体代码如下:
func Demo() {
var workers = 100000
ch0 := make(chan int, 0)
ch1 := make(chan int, 1)
ch2 := make(chan int, 2)
ch3 := make(chan int, 3)
counter := make([]int, 4)
// 开workers条协程,发送消息到四条通道
for i := 0; i < workers; i++ {
go func(n int) {
select {
case ch0 <- n:
send(n, 0)
case ch1 <- n:
send(n, 1)
case ch2 <- n:
send(n, 2)
case ch3 <- n:
send(n, 3)
}
}(i)
}
// 主协程从四条通道接收workers条子协程发送的消息
for i := 0; i < workers; i++ {
select {
case rec := <-ch0:
receive(rec, 0)
counter[0]++
case rec := <-ch1:
receive(rec, 1)
counter[1]++
case rec := <-ch2:
receive(rec, 2)
counter[2]++
case rec := <-ch3:
receive(rec, 3)
counter[3]++
}
}
fmt.Println("channel counter:", counter)
}
func send(i, j int) {
fmt.Printf("goroutine#%d send %d to ch%d\n", i, i, j)
}
func receive(routine, ch int) {
fmt.Printf("receive from goroutine#%d to ch%d channel\n", ch, routine)
}
运行结果:
...
receive from goroutine#92612 to ch2 channel
receive from goroutine#92426 to ch0 channel
receive from goroutine#92733 to ch0 channel
receive from goroutine#92531 to ch0 channel
goroutine#92531 send 92531 to ch0
goroutine#92616 send 92616 to ch3
goroutine#92425 send 92425 to ch2
goroutine#92612 send 92612 to ch2
goroutine#92733 send 92733 to ch0
goroutine#92424 send 92424 to ch2
receive from goroutine#92423 to ch3 channel
receive from goroutine#92614 to ch1 channel
receive from goroutine#92734 to ch1 channel
goroutine#92617 send 92617 to ch3
goroutine#92735 send 92735 to ch1
goroutine#92530 send 92530 to ch3
...
channel counter: [25030 25073 25011 24886]
以上结果表明,多条协程数据比较均衡地从四条通道中通过
三、扇入扇出——分流模式
我们通常会遇到许多耗时任务,如一个通道的数据流向一个执行函数时,当前函数执行时长较长,我们可以把该通道的数据流拆分流向多个通道,并给每个通道启动相应的goroutine处理;随后把所有通道汇总到一个通道输出,以加大该耗时任务的执行效率。
以上过程我们成为扇入扇出过程:
- 扇出(Fan-out):将一个通道的数据分流给多个通道并启动多个goroutine处理;
- 扇入(Fan-in):将多个goroutines返回的通道的结果组合到一个通道并输出;
我们看一个简单示例:模拟管道中某一阶段的耗时任务
// 模拟耗时任务
func takeUpTimeTask(done <-chan interface{}, inStream <-chan interface{}) <-chan interface{} {
outStream := make(chan interface{})
go func() {
defer close(outStream)
for {
select {
case <-done:
return
case val ,ok := <-inStream:
if !ok {
return
}
select {
case <-done:
return
case outStream <- val:
// 模拟耗时任务
time.Sleep(time.Second)
}
}
}
}()
return outStream
}
我们来模拟调用运行它
// 没有扇入扇出处理
func Demo1() {
done := make(chan interface{})
defer close(done)
inStream := make(chan interface{})
go func() {
defer close(inStream)
for i := 0; i < 20; i++ {
inStream <- i
}
}()
// 对数据流执行耗时任务
resultChan := takeUpTimeTask(done, inStream)
// 使用chRange安全遍历打印
for val := range chRange(done,resultChan){
fmt.Printf("%v ", val)
}
}
结果输出:处理20个数据花费20s
=== RUN TestDemo1
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 --- PASS: TestDemo81 (20.05s)
PASS
这种情况很显然是可以用并发对耗时任务进行改进的,使用扇入扇出的思路可以这样做:
// 扇出处理耗时任务
func fanOut(done <-chan interface{}, chanStream chan interface{}) []<-chan interface{} {
numFinders := runtime.NumCPU()
finders := make([]<-chan interface{}, numFinders)
for i := 0; i < numFinders; i++ {
// 耗时任务的分流管道
finders[i] = takeUpTimeTask(done, chanStream)
}
return finders
}
// 扇入汇流结果通道
func fanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
// 管道汇流处理
multiplex := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
// 从所有的通道中取数据
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
// 等待所有数据汇总完毕
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
改进之后我们再来调用它们:
// 扇入扇出处理
func Demo2() {
done := make(chan interface{})
defer close(done)
inStream := make(chan interface{})
go func() {
defer close(inStream)
for i := 0; i < 20; i++ {
inStream <- i
}
}()
// 扇入扇出执行耗时任务
resultChan := fanIn(done, fanOut(done, inStream)...)
// 使用chRange安全遍历打印
for val := range chRange(done,resultChan){
fmt.Printf("%v ", val)
}
}
结果输出:四个逻辑单元并发运行耗时5s,效率提升4倍
=== RUN TestDemo82
0 1 3 2 4 6 5 7 8 10 9 11 12 13 14 15 16 18 17 19 --- PASS: TestDemo82 (5.02s)
PASS
四、动态创建协程
根据需要来动态创建goroutine,并限制可并发的协程数。
// 设置默认最多开启5子协程
const maxRoutineNum = 5
func autoRoutine(wg *sync.WaitGroup, inStream <-chan int) {
for {
in, ok := <-inStream
if !ok {
break
}
// 自动开启协程的条件:输入流为偶数
if in%2 == 0 && runtime.NumGoroutine() < maxRoutineNum{
wg.Add(1)
go process(in, func() { wg.Done() })
} else {
process(in, nil)
}
}
}
func process(in int, callback func()) {
if callback != nil {
defer callback()
fmt.Printf("sub routine process:%d \n", in)
time.Sleep(1000000 * time.Microsecond)
}else{
fmt.Printf("parent routine process:%d \n", in)
time.Sleep(100000 * time.Microsecond)
}
}
调用示例:
// 动态创建协程
func Demo() {
wg := sync.WaitGroup{}
inStream := make(chan int)
wg.Add(1)
go func() {
defer wg.Done()
defer close(inStream)
for i := 1; i < 10; i++ {
inStream <- i
}
}()
autoRoutine(&wg, inStream)
wg.Wait()
}
运行结果:可以看到,程序根据要求最多开不超过5个子协程
=== RUN TestDemo121
parent routine process:1
sub routine process:2
parent routine process:3
parent routine process:5
sub routine process:4
parent routine process:6
parent routine process:7
parent routine process:8
parent routine process:9
--- PASS: TestDemo121 (1.21s)
PASS
以上只是演示该模式的一个简单例子,但它可以扩展到许多应用,如对不定数量的资源进行计算,其中某些资源比较耗时,需要开启协程提高执行效率,但协程数不能无限扩张。一个具体的例子就是使用go标准包的filepath.Walk()对系统特点目录的文件进行哈希计算,显然对某个目录的文件数是事先不确定的,且大文件的哈希计算耗时比较大,这就需要开启子协程计算,而协程又不能无限制扩张,因为系统对打开文件数又有限制。这种场景就非常适合使用这种模式,感兴趣的可自己实现一下。
网友评论