美文网首页
GO channels

GO channels

作者: 舍是境界 | 来源:发表于2021-12-29 06:56 被阅读0次

Channel

与map结构类似,channel也是通过make进行分配的,其返回值实际上是一个指向底层相关数据结构的引用。如果在创建channel时提供一个可选的整型参数,会设置该channel的缓冲区大小。该值缺省为0,用来构建默认的“无缓冲channel”,也称为“同步channel”。

ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0)         // unbuffered channel of integers
cs := make(chan *os.File, 100)  // buffered channel of pointers to Files

无缓冲的channel使得通信—值的交换—和同步机制组合—共同保证了两个执行线索(Goroutines)运行于可控的状态。

对于channel,有很多巧妙的用法。我们通过以下示例开始介绍。通过使用channel,可以让发起操作的Gorouine等待排序操作的完成。

c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c   // Wait for sort to finish; discard sent value.

接收方会一直阻塞直到有数据到来。如果channel是无缓冲的,发送方会一直阻塞直到接收方将数据取出。如果channel带有缓冲区,发送方会一直阻塞直到数据被拷贝到缓冲区;如果缓冲区已满,则发送方只能在接收方取走数据后才能从阻塞状态恢复。

带缓冲区的channel可以像信号量一样使用,用来完成诸如吞吐率限制等功能。在以下示例中,到来的请求以参数形式传入handle函数,该函数从channel中读出一个值,然后处理请求,最后再向channel写入以使“信号量”可用,以便响应下一次处理。该channel的缓冲区容量决定了并发调用process函数的上限,因此在channel初始化时,需要传入相应的容量参数。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    <-sem          // Wait for active queue to drain.
    process(r)     // May take a long time.
    sem <- 1       // Done; enable next request to run.
}

func init() {
    for i := 0; i < MaxOutstanding; i++ {
        sem <- 1
    }
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // Don't wait for handle to finish.
    }
}

由于在Go中,数据同步发生在从channel接收数据阶段(也就是说,发送操作发生在接收操作之前,因此获取信号量的操作必须实现在channel的接收阶段,而不是发送阶段。

这样的设计会引入一个问题: Serve会为每个请求创建一个新的Goroutine,尽管在任意时刻只有最多MaxOutstanding个可以执行。如果请求到来的速度过快,将迅速导致系统资源完全消耗。我们可以通过修改Serve的实现来对Goroutine的创建进行限制。以下给出一个简单的实现,请注意其中包含一个BUG,我们会在后续进行修正:

func Serve(queue chan *Request) {
    for req := range queue {
        <-sem
        go func() {
            process(req) // Buggy; see explanation below.
            sem <- 1
        }()
    }
}

刚才说的BUG源自Go中for循环的实现,循环的迭代变量会在循环中被重用,因此req变量会在所有Goroutine间共享。这不是我们所乐见的,我们需要保证req变量是每个Goroutine私有的。这里提供一个方法,将req的值以参数形式提供给goroutine对应的闭包:

func Serve(queue chan *Request) {
    for req := range queue {
        <-sem
        go func(req *Request) {
            process(req)
            sem <- 1
        }(req)
    }
}

请与之前有BUG的实现进行对比,看看闭包在声明和运行上的不同之处。另一个解决方案是,干脆创建一个新的同名变量,示例如下:

func Serve(queue chan *Request) {
    for req := range queue {
        <-sem
        req := req // Create new instance of req for the goroutine.
        go func() {
            process(req)
            sem <- 1
        }()
    }
}

这样写可能看起来怪怪的

req := req

但它确实是合法的并且在Go中是一种惯用的方法。你可以如法泡制一个新的同名变量,用来为每个Goroutine创建循环变量的私有拷贝。

回到实现通用服务器的问题上来,另一个有效的资源管理途径是启动固定数量的handle Goroutine,每个Goroutine都直接从channel中读取请求。这个固定的数值就是同时执行process的最大并发数。Serve函数还需要一个额外的channel参数,用来等待退出通知;当创建完所有的Goroutine之后, Server 自身阻塞在该channel上等待结束信号。

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit  // Wait to be told to exit.
}

Channel类型的Channel

Channel在Go语言中是一个 first-class 类型,这意味着channel可以像其他 first-class 类型变量一样进行分配、传递。该属性的一个常用方法是用来实现安全、并行的解复用(demultiplexing)处理。

handle是一个理想化的请求处理,但我们并没有定义处理的类型。如果处理的类型中包括一个用来响应的channel,则每个客户端可以提供其独特的响应方式。这里提供一个简单的Request类型定义:

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}

客户端提供了一个函数及其参数,以及一个内部的channel变量用来接收回答消息。

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)

在服务器端,只有处理函数handle需要改变:

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

显然,上述例子还有很大的优化空间以提高其可用性,但是这套代码已经可以作为一类对速度要求不高、并行、非阻塞式RPC系统的实现框架了,而且实现中没有使用任何显式的互斥语法。

小结

本文介绍了channel的常见用法以及利用go语言的特性给出了一个简单的RPC实现框架,希望对你能有帮助,更多信息,参考:GO 官方文档

相关文章

  • Golang 使用tee将一个channel分拆成两个相同的ch

    tees/channels.go teedemo.go 程序输出如下

  • Channels In Go

    本文翻译自Channels In Go Channel是Go中一个重要的内置功能。这是让Go独一无二的功能之一,除...

  • GO channels

    Channel 与map结构类似,channel也是通过make进行分配的,其返回值实际上是一个指向底层相关数据结...

  • go 圣经 摘抄 第 8 章

    Chapter 8 Goroutines and Channels Go enable two styles of...

  • Go中的Channel——range和select

    译自Channels in Go - range and select,该文章分为两部分,第一部分的翻译见Go中的...

  • Go教程第十九篇: Channels

    Go教程第十九篇: Channels 本文是《Go系列教程》的第十九篇文章。 在上一篇文章中,我们讨论了并发是如何...

  • (转)Concurrency in Go 3--Channels

    通道也可以声明为仅支持单向数据流——即你可以定义仅支持发送或仅支持接收数据的通道。我将在本节的末尾解释单向数据流的...

  • 给自己的区块链添加网络通信

    准备工作: go语言中Channels的用法:在不同的Goroutine中运行的函数之间传递数据,可以使用Chan...

  • 什么是chan

    如果说 goroutine 是 Go语言程序的并发体的话,那么 channels 就是它们之间的通信机制。一个 c...

  • channel学习

    一、channle的基本概念 channels是go中不同goroutines交互数据的一种通道,也就是说如果两个...

网友评论

      本文标题:GO channels

      本文链接:https://www.haomeiwen.com/subject/chyjqrtx.html