美文网首页
GO channels

GO channels

作者: 舍是境界 | 来源:发表于2021-12-29 06:56 被阅读0次

    Channel

    与map结构类似,channel也是通过make进行分配的,其返回值实际上是一个指向底层相关数据结构的引用。如果在创建channel时提供一个可选的整型参数,会设置该channel的缓冲区大小。该值缺省为0,用来构建默认的“无缓冲channel”,也称为“同步channel”。

    ci := make(chan int)            // unbuffered channel of integers
    cj := make(chan int, 0)         // unbuffered channel of integers
    cs := make(chan *os.File, 100)  // buffered channel of pointers to Files
    

    无缓冲的channel使得通信—值的交换—和同步机制组合—共同保证了两个执行线索(Goroutines)运行于可控的状态。

    对于channel,有很多巧妙的用法。我们通过以下示例开始介绍。通过使用channel,可以让发起操作的Gorouine等待排序操作的完成。

    c := make(chan int)  // Allocate a channel.
    // Start the sort in a goroutine; when it completes, signal on the channel.
    go func() {
        list.Sort()
        c <- 1  // Send a signal; value does not matter.
    }()
    doSomethingForAWhile()
    <-c   // Wait for sort to finish; discard sent value.
    

    接收方会一直阻塞直到有数据到来。如果channel是无缓冲的,发送方会一直阻塞直到接收方将数据取出。如果channel带有缓冲区,发送方会一直阻塞直到数据被拷贝到缓冲区;如果缓冲区已满,则发送方只能在接收方取走数据后才能从阻塞状态恢复。

    带缓冲区的channel可以像信号量一样使用,用来完成诸如吞吐率限制等功能。在以下示例中,到来的请求以参数形式传入handle函数,该函数从channel中读出一个值,然后处理请求,最后再向channel写入以使“信号量”可用,以便响应下一次处理。该channel的缓冲区容量决定了并发调用process函数的上限,因此在channel初始化时,需要传入相应的容量参数。

    var sem = make(chan int, MaxOutstanding)
    
    func handle(r *Request) {
        <-sem          // Wait for active queue to drain.
        process(r)     // May take a long time.
        sem <- 1       // Done; enable next request to run.
    }
    
    func init() {
        for i := 0; i < MaxOutstanding; i++ {
            sem <- 1
        }
    }
    
    func Serve(queue chan *Request) {
        for {
            req := <-queue
            go handle(req)  // Don't wait for handle to finish.
        }
    }
    

    由于在Go中,数据同步发生在从channel接收数据阶段(也就是说,发送操作发生在接收操作之前,因此获取信号量的操作必须实现在channel的接收阶段,而不是发送阶段。

    这样的设计会引入一个问题: Serve会为每个请求创建一个新的Goroutine,尽管在任意时刻只有最多MaxOutstanding个可以执行。如果请求到来的速度过快,将迅速导致系统资源完全消耗。我们可以通过修改Serve的实现来对Goroutine的创建进行限制。以下给出一个简单的实现,请注意其中包含一个BUG,我们会在后续进行修正:

    func Serve(queue chan *Request) {
        for req := range queue {
            <-sem
            go func() {
                process(req) // Buggy; see explanation below.
                sem <- 1
            }()
        }
    }
    

    刚才说的BUG源自Go中for循环的实现,循环的迭代变量会在循环中被重用,因此req变量会在所有Goroutine间共享。这不是我们所乐见的,我们需要保证req变量是每个Goroutine私有的。这里提供一个方法,将req的值以参数形式提供给goroutine对应的闭包:

    func Serve(queue chan *Request) {
        for req := range queue {
            <-sem
            go func(req *Request) {
                process(req)
                sem <- 1
            }(req)
        }
    }
    

    请与之前有BUG的实现进行对比,看看闭包在声明和运行上的不同之处。另一个解决方案是,干脆创建一个新的同名变量,示例如下:

    func Serve(queue chan *Request) {
        for req := range queue {
            <-sem
            req := req // Create new instance of req for the goroutine.
            go func() {
                process(req)
                sem <- 1
            }()
        }
    }
    

    这样写可能看起来怪怪的

    req := req
    

    但它确实是合法的并且在Go中是一种惯用的方法。你可以如法泡制一个新的同名变量,用来为每个Goroutine创建循环变量的私有拷贝。

    回到实现通用服务器的问题上来,另一个有效的资源管理途径是启动固定数量的handle Goroutine,每个Goroutine都直接从channel中读取请求。这个固定的数值就是同时执行process的最大并发数。Serve函数还需要一个额外的channel参数,用来等待退出通知;当创建完所有的Goroutine之后, Server 自身阻塞在该channel上等待结束信号。

    func handle(queue chan *Request) {
        for r := range queue {
            process(r)
        }
    }
    
    func Serve(clientRequests chan *Request, quit chan bool) {
        // Start handlers
        for i := 0; i < MaxOutstanding; i++ {
            go handle(clientRequests)
        }
        <-quit  // Wait to be told to exit.
    }
    

    Channel类型的Channel

    Channel在Go语言中是一个 first-class 类型,这意味着channel可以像其他 first-class 类型变量一样进行分配、传递。该属性的一个常用方法是用来实现安全、并行的解复用(demultiplexing)处理。

    handle是一个理想化的请求处理,但我们并没有定义处理的类型。如果处理的类型中包括一个用来响应的channel,则每个客户端可以提供其独特的响应方式。这里提供一个简单的Request类型定义:

    type Request struct {
        args        []int
        f           func([]int) int
        resultChan  chan int
    }
    

    客户端提供了一个函数及其参数,以及一个内部的channel变量用来接收回答消息。

    func sum(a []int) (s int) {
        for _, v := range a {
            s += v
        }
        return
    }
    
    request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
    // Send request
    clientRequests <- request
    // Wait for response.
    fmt.Printf("answer: %d\n", <-request.resultChan)
    

    在服务器端,只有处理函数handle需要改变:

    func handle(queue chan *Request) {
        for req := range queue {
            req.resultChan <- req.f(req.args)
        }
    }
    

    显然,上述例子还有很大的优化空间以提高其可用性,但是这套代码已经可以作为一类对速度要求不高、并行、非阻塞式RPC系统的实现框架了,而且实现中没有使用任何显式的互斥语法。

    小结

    本文介绍了channel的常见用法以及利用go语言的特性给出了一个简单的RPC实现框架,希望对你能有帮助,更多信息,参考:GO 官方文档

    相关文章

      网友评论

          本文标题:GO channels

          本文链接:https://www.haomeiwen.com/subject/chyjqrtx.html