缓冲信道
之前看到的都是无缓冲信道,无缓冲信道的发送和接收过程是阻塞的。我们还可以创建一个有缓冲(Buffer)的信道。
- 只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。
- 只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。
通过向 make 函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。
ch := make(chan type, capacity)
要让一个信道有缓冲,上面语法中的 capacity 应该大于 0。无缓冲信道的容量默认为 0。我们在上一章创建信道时,省略了容量参数。
示例1:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。
示例2:
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
死锁
向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时,由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic
长度 vs 容量
- 容量:指信道可以存储的值的数量。我们在使用 make 函数创建缓冲信道的时候会指定容量大小。
- 长度:指信道中当前排队的元素个数。
示例代码:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
WaitGroup
WaitGroup 用于实现工作池,因此要理解工作池,我们首先需要了解 WaitGroup。
WaitGroup 用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup 来实现。
WaitGroup原理:
WaitGroup 使用计数器来工作。
调用WaitGroup
的Add
并传递一个int
时,WaitGroup 的计数器会增加上Add
的传参。
要减少计数器,可以调用WaitGroup
的Done()
方法。
Wait() 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。
代码:
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
解释代码:
-
WaitGroup 是一个结构体类型,创建
WaitGroup
类型的变量,其初始值为零值。 - for 循环迭代了 3 次,在循环内调用了 wg.Add(1)。因此计数器变为 3。for 循环同样创建了 3 个 process 协程
- 调用了 wg.Wait(),确保 Go 主协程等待计数器变为 0
- process 协程内调用了 wg.Done,可以让计数器递减。一旦 3 个子协程都执行完毕(即 wg.Done() 调用了 3 次),那么计数器就变为 0,于是主协程会解除阻塞。
注意:
process函数中传递 wg 的地址是很重要的。如果没有传递 wg 的地址,那么每个 Go 协程将会得到一个 WaitGroup 值的拷贝,因而当它们执行结束时,main 函数并不会知道。
由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。
工作池的实现
缓冲信道的重要应用之一就是实现工作池。
一般而言,工作池就是一组等待任务分配的线程。
一旦完成了所分配的任务,这些线程可继续等待任务的分配。
用一个需求说明工作池:
计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列随机数。
我们工作池的核心功能如下:
- 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
- 将作业添加到该输入型缓冲信道中。
- 作业完成后,再将结果写入一个输出型缓冲信道。
- 从输出型缓冲信道读取并打印结果。
网友评论