Channel
与map结构类似,channel
也是通过make
进行分配的,其返回值实际上是一个指向底层相关数据结构的引用。如果在创建channel时提供一个可选的整型参数,会设置该channel的缓冲区大小。该值缺省为0,用来构建默认的“无缓冲channel”,也称为“同步channel”。
ci := make(chan int) // unbuffered channel of integers
cj := make(chan int, 0) // unbuffered channel of integers
cs := make(chan *os.File, 100) // buffered channel of pointers to Files
无缓冲的channel使得通信—值的交换—和同步机制组合—共同保证了两个执行线索(Goroutines)运行于可控的状态。
对于channel,有很多巧妙的用法。我们通过以下示例开始介绍。通过使用channel,可以让发起操作的Gorouine等待排序操作的完成。
c := make(chan int) // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c <- 1 // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c // Wait for sort to finish; discard sent value.
接收方会一直阻塞直到有数据到来。如果channel是无缓冲的,发送方会一直阻塞直到接收方将数据取出。如果channel带有缓冲区,发送方会一直阻塞直到数据被拷贝到缓冲区;如果缓冲区已满,则发送方只能在接收方取走数据后才能从阻塞状态恢复。
带缓冲区的channel可以像信号量一样使用,用来完成诸如吞吐率限制等功能。在以下示例中,到来的请求以参数形式传入handle函数,该函数从channel中读出一个值,然后处理请求,最后再向channel写入以使“信号量”可用,以便响应下一次处理。该channel的缓冲区容量决定了并发调用process函数的上限,因此在channel初始化时,需要传入相应的容量参数。
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
<-sem // Wait for active queue to drain.
process(r) // May take a long time.
sem <- 1 // Done; enable next request to run.
}
func init() {
for i := 0; i < MaxOutstanding; i++ {
sem <- 1
}
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}
由于在Go中,数据同步发生在从channel接收数据阶段(也就是说,发送操作发生在接收操作之前,因此获取信号量的操作必须实现在channel的接收阶段,而不是发送阶段。
这样的设计会引入一个问题: Serve
会为每个请求创建一个新的Goroutine,尽管在任意时刻只有最多MaxOutstanding
个可以执行。如果请求到来的速度过快,将迅速导致系统资源完全消耗。我们可以通过修改Serve
的实现来对Goroutine的创建进行限制。以下给出一个简单的实现,请注意其中包含一个BUG,我们会在后续进行修正:
func Serve(queue chan *Request) {
for req := range queue {
<-sem
go func() {
process(req) // Buggy; see explanation below.
sem <- 1
}()
}
}
刚才说的BUG源自Go中for循环的实现,循环的迭代变量会在循环中被重用,因此req变量会在所有Goroutine间共享。这不是我们所乐见的,我们需要保证req变量是每个Goroutine私有的。这里提供一个方法,将req的值以参数形式提供给goroutine对应的闭包:
func Serve(queue chan *Request) {
for req := range queue {
<-sem
go func(req *Request) {
process(req)
sem <- 1
}(req)
}
}
请与之前有BUG的实现进行对比,看看闭包在声明和运行上的不同之处。另一个解决方案是,干脆创建一个新的同名变量,示例如下:
func Serve(queue chan *Request) {
for req := range queue {
<-sem
req := req // Create new instance of req for the goroutine.
go func() {
process(req)
sem <- 1
}()
}
}
这样写可能看起来怪怪的
req := req
但它确实是合法的并且在Go中是一种惯用的方法。你可以如法泡制一个新的同名变量,用来为每个Goroutine创建循环变量的私有拷贝。
回到实现通用服务器的问题上来,另一个有效的资源管理途径是启动固定数量的handle Goroutine,每个Goroutine都直接从channel中读取请求。这个固定的数值就是同时执行process的最大并发数。Serve函数还需要一个额外的channel参数,用来等待退出通知;当创建完所有的Goroutine之后, Server 自身阻塞在该channel上等待结束信号。
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}
func Serve(clientRequests chan *Request, quit chan bool) {
// Start handlers
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // Wait to be told to exit.
}
Channel类型的Channel
Channel在Go语言中是一个 first-class 类型,这意味着channel可以像其他 first-class 类型变量一样进行分配、传递。该属性的一个常用方法是用来实现安全、并行的解复用(demultiplexing)处理。
handle
是一个理想化的请求处理,但我们并没有定义处理的类型。如果处理的类型中包括一个用来响应的channel,则每个客户端可以提供其独特的响应方式。这里提供一个简单的Request类型定义:
type Request struct {
args []int
f func([]int) int
resultChan chan int
}
客户端提供了一个函数及其参数,以及一个内部的channel变量用来接收回答消息。
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)
在服务器端,只有处理函数handle需要改变:
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
显然,上述例子还有很大的优化空间以提高其可用性,但是这套代码已经可以作为一类对速度要求不高、并行、非阻塞式RPC系统的实现框架了,而且实现中没有使用任何显式的互斥语法。
小结
本文介绍了channel的常见用法以及利用go语言的特性给出了一个简单的RPC实现框架,希望对你能有帮助,更多信息,参考:GO 官方文档
网友评论