本文是《Go系列教程》的第二十篇文章。
什么是缓冲区通道
我们前面所讲的大多数都是基本的非缓冲区通道。正如我们在通道中所讲的,读写一个非缓冲区通道都是阻塞操作。当然了,除此之外,我们还可以创建带有缓冲区的通道。只有当缓冲区满时,向缓冲区中发送数据才会被阻塞住。类似地,只有当缓冲区为空时,从缓存区中读取数据才会被阻塞住。
通过给make函数传递一个capacity参数,我们即可创建一个带有缓冲区的通道。
ch := make(chan type, capacity)
其中,capacity的值必须大于0,我们来写段代码,用以创建一个带有缓冲区的通道。
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
在上面的程序中,我们创建了一个容量为2的缓冲区通道。因为通道的容量是2,故而我们可以向此通道中无阻塞地写入2个字符串。然后我们又分别从通道中读取俩个字符串,程序的测试如下:
naveen
paul
案例二
我们再来看一个程序,在这个程序中,我们在并发的Goroutine中向缓冲区通道中写入数据,并在main函数的Goroutine中读取数据。下面这个程序将有助于我们理解向缓冲区通道写入时阻塞的情况。
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
在上面的程序中,我们首先创建了一个容量为2的缓冲区通道,并把它传递给write函数。之后,main函数的Goroutine会睡眠2秒。在此期间,就会并发地执行write函数的Goroutine。在write函数的Goroutine内部有一个for循环,此for循环会不断地把从0到4的数据写入到名为ch的channel中。由于此缓冲区通道的容量为2,所以,write函数的Goroutine可以立即向通道中写入0和1。之后,它就会被阻塞住直到有一个值从ch通道中读出来。因此,程序将理解打印出如下俩行。
successfully wrote 0 to ch
successfully wrote 1 to ch
在打印出如下俩行之后,向ch通道的写入操作就会被阻塞住,直到有人从ch通道中读取。由于在开始从通道中读取之前,main函数的Goroutine睡眠了2秒,所以,程序在接下来的2秒钟不会打印任何内容。
2秒之后,main函数的Goroutine就会醒来,并利用for循环从ch通道中执行读取操作,打印出读取出来的值,之后再次睡眠2秒,往复循环,直到ch通道关闭。因此,程序在2秒之后,将打印如下内容.
read value 0 from ch
successfully wrote 2 to ch
程序会继续运行,直到所有的值都被写入到通道中,并且在write的Goroutine中关闭通道。最终的程序输出如下:
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
死锁
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
在上面的程序中,我们向容量为2的缓冲区通道中写入3个字符串。当程序走到底三个写操作的时候,就会被阻塞住,因为此时通道已经满了。此时为了能使写操作继续,必须得有其他的Goroutine从此通道中执行读取操作。但是在这种情形下,并不存在其他并发的Goroutine从此通道中读取数据,因此,程序就会死锁,程序就会在运行时报出如下错误信息:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100
长度 VS 容量
缓冲区通道的容量指的是此通道能容纳的数据值的数量。此值就是我们在使用make函数创建通道时指定的参数。通道的长度指的是此通道中的元素数量。用一段程序来解释一下。
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
在上面的程序中,我们创建了一个容量为3的通道,就是说,它能装3个字符串。当我们向此通道中写入了2个字符串之后,此时通道中就有了2个字符串了。
因此,它的长度就是2。之后,我们又从此通道中读取了一个字符串,那么,这时候,通道中就只有一个字符串了。因此,它的长度就变成了1。此程序输出如下:
capacity is 3
length is 2
read value naveen
new length is 1
等待组
在下一部分中,我们要学习线程池。要理解线程池,我们需要先学习一下waitGroup等待组。因为它是线程池的实现。waitGroup可用于等待一组Goroutine运行结束。如果所有的Goroutine都没有执行完成的话,程序就会阻塞住。比如说,我们在main函数的Goroutine中创建了3个并发执行的Goroutine。但是main函数的Goroutine在终止之前,需要等待其他3个Goroutine运行完成。此时,就可以使用WaitGroup来实现。
我们先把理论停下,写段实际的代码看下:
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
WaitGroup是一个结构体类型,在18行,我们创建了一个WaitGroup。这种方式下,WaitGroup就充当一个计数器。当我们调用Add方法,并传递给它一个int值之后,WaitGroup的计数器的值就增加相应的int值。要想减少计数器的值,我们可以调用Done()方法。Wait()方法会一直阻塞在调用它的地方,直到计数器的值变为0。
在上面的程序中,我们在for循环里面调用了wg.Add(1),此循环会迭代3次。因此,此时计数器的值变为了3。同时,for循环也创建了3个process的Goroutine。之后在main函数里面调用了wait()方法,那么此时,main函数的Goroutine就会阻塞住,直到计数器的值变为0。在process的Goroutine里面,我们可以调用wg.Done()函数递减计数器的值。
一旦这三个Goroutine运行完成,即:wg.Done被调用了3次。计数器将会变成0,main函数就解除阻塞。
重要的是, go process(i, &wg)这行中,我们传入的值一定要是wg的地址,不然的话,每一个Goroutine都拥有自己单独的WaitGroup拷贝,这样,就实现不了预期的目的。
当他们都执行完成的时候,也不会通知main函数。
程序的输出如下:
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
你们的输出可能和我的不完全一样,这是因为Goroutine的执行顺序不同。
线程池实现
缓冲区通道最重要的用途之一,便是实现worker pool。
通常,一个线程池就是一组等待执行所分配任务的线程。一旦,他们完成了各自分配的任务之后,就处于准备接受下次任务的状态。
我们将使用缓冲区通道来实现线程池。我们的线程池可以实现给定一个输入数值,能够计算出每一个数位上的数字的和。例如,我们传一个234给此线程池,其输出结果将是9即(2+3+4) 。
下面这些都是我们的线程池的核心功能:
-
创建一个Goroutine线程池,该线程池会监听输入缓冲区通道,等待任务分配
-
向此输入缓冲区通道中添加任务
-
在任务完成后,把执行结果写入到一个输出缓冲区通道中
-
从输入缓冲区通道中读取和打印结果
我们将一步一步地把这个程序写出来,以便于理解。
第一步: 创建结构体,用以表示任务和任务的执行结果
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
每一个Job结构体都有一个id和一个randomno,其中randommo是每一个数位上的数字的和。
结构体Result有一个job字段和一个sumofdigits字段。
下一步就是创建缓冲区通道,用以接收任务以及写入输出结果。
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
线程池中的Goroutine会在缓冲区通道jobs上监听新的任务。一旦任务完成,就会把结果写入到results缓冲区通道。
下面的这个digits 函数会查找每一个数位上的数字,并计算出总和返回。我们在这个函数内部增加了一个sleep调用,睡眠2秒钟,用以模拟这个任务的耗时。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
下一步,我们写一个函数,用于创建worker Goroutine。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
在上面的函数中,我们创建了一个worker,此worker从jobs通道中读取数据,并使用当前的job创建一个Result结构体,以及返回digits函数的的返回值,之后把结果写入到results缓冲区通道中。另外,此函数会接收一个等待组wg 作为参数,当所有的任务都执行完成后,就会调用wg的Done方法。
createWorkerPool 函数负责创建一个一个worker Goroutine的线程池。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
在上面的函数,会接收一个参数作为worker的数量。在创建Goroutine之前,会先调用wg.Add(1)来增加WaitGroup的计数器。
之后,把等待组wg的地址作为参数传递给worker函数,worker函数就会创建一个worker Groutine,在创建好需要的worker Goroutine之后,就调用了wg.wait()方法,等待所有的Goroutine完成他们的执行。在所有的Goroutine完成之后,它就会把results通道关闭,因为所有的Goroutine都执行完成了,不再向results通道中执行写入操作。
现在,既然我们已经有了一个线程池了,我们就继续往下走。我们来写个函数专门用于给worker分配任务。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
上面的allocate 函数会生成一批随机数,并使用随机数创建Job结构体。for循环的计数器i会当做id,写入到jobs通道中。在所有的工作完成之后,关闭jobs通道。
下一步,创建一个可以读取results通道,并把读取数据打印出来的函数。
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
result函数会从results通道中进行读取,并打印出job的id,输入的随机数,随机数的数位和。此外,此result函数还接收了一个done通道作为参数,当它把所有结果打印完成后,即把true写入到done通道中。
现在我们一切都已经准备就绪,我们现在就需要完成最后一步,即:在main函数中调用上面这些方法。
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
我们首先记录了一下程序执行的开始时间,之后,在程序结束之后,我们计算开始时间和结束时间的差值,统计出程序运行的总耗时。这样做是必要的,因为我们需要通过改变Goroutine的
数量来做基准测试。
把100赋值给noOfJobs,然后调用allocate()方法把任务添加到jobs通道中。之后,创建了一个done通道,并传递给result。这样的话,result就会打印输出,并在一切完成之后,发出通知。
最后,调用createWorkerPool函数创建了有10个worker Goroutine的线程池,之后,main函数在done通道上等待所有结果输出。完整的程序如下:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
运行上面的程序,程序的输出如下:
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds
感谢您的阅读,请留下您珍贵的反馈和评论。Have a good Day!
备注
本文系翻译之作原文博客地址
网友评论