本文将会展示许多channel使用案例。 我希望这篇文字能使你在以下几个方面更加便利:
- 用channel进行异步和并发编程
- channel的同步技术有比异步编程概念,例如其他语言中的actor模型和async/await 模式更广泛的应用和更加灵活的特点。
请注意这点,本文的目的在于展示尽可能多的的channel使用案例。我们应该明白channel不是go语言支持的唯一的并发编程技术,在某些情况下,channel也不是最好的解决方案。想了解更多并发编程技术,请了解原子操作和一些其他同步技术。
将channel当做Futures/Promises
Futures/Promises在许多其他语言中都有使用。他们经常与请求和响应相关。
返回仅能接受的channel作为结果
下面的例子中,函数sumSquares
的灵活参数调用的值被并发的请求。两个通道中的没有一个接收操作将阻塞,知道一个发送操作执行在相关的channel上。它话费3秒而不是6秒返回最终结果。
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest() <-chan int32 {
r := make(chan int32)
go func() {
time.Sleep(time.Second * 3) // simulate a workload
r <- rand.Int31n(100)
}()
return r
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano())
a, b := longTimeRequest(), longTimeRequest()
fmt.Println(sumSquares(<-a, <-b))
}
将只能发送的channel作为参数
与上面的例子相同,下面的例子中,对有两个参数的函数sumSquares
的调用的值被并发调用。不同于上一个例子,longTimeRequest
函数采用一个仅能发送的channel作为参数,而不是返回一个仅能接受的channel结果。
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest(r chan<- int32) {
time.Sleep(time.Second * 3)
r <- rand.Int31n(100)
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano())
ra, rb := make(chan int32), make(chan int32)
go longTimeRequest(ra)
go longTimeRequest(rb)
fmt.Println(sumSquares(<-ra, <-rb))
}
事实上,上面的例子中,我们不需要两个channel来传递结果,使用一个就行了
...
results := make(chan int32, 2) // can be buffered or not
go longTimeRequest(results)
go longTimeRequest(results)
fmt.Println(sumSquares(<-results, <-results))
}
第一个响应夺冠
这是上面的例子中只使用一个channel的加强版
有时候,一个份数据可以从多个源接受,以避免高延迟。由于许多因素,这些来源的响应时间差别可能很多。即使对于指定的源,其响应持续时间也不是恒定的。为了使响应持续时间尽可能短,我们可以向分离的goroutine中的每个源发送请求。只会使用第一个响应,其他较慢的响应将被丢弃。
注意,如果有N个源,则通信信道的容量必须至少为N-1,以避免goroutines对应被丢弃的响应被永久阻止。
package main
import (
"fmt"
"time"
"math/rand"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3) + 1
time.Sleep(time.Duration(rb) * time.Second) // sleep 1s, 2s or 3s
c <- ra
}
func main() {
rand.Seed(time.Now().UnixNano())
startTime := time.Now()
c := make(chan int32, 5) // need a buffered channel
for i := 0; i < cap(c); i++ {
go source(c)
}
rnd := <- c // only the first response is used
fmt.Println(time.Since(startTime))
fmt.Println(rnd)
}
通过使用select机制和容量为1的缓冲通道,还有其他一些方法可以实现第一个响应 - 获胜用例。其他方式将在下面介绍。
更多变相的请求-响应案例
由于参数和结果 channel能被缓冲,使得响应侧不需要等待请求侧取出传递的值。
有时候,一个请求并不保证能返回一个合法的值。相反,由于各种原因,可能返回一个错误。对应这种情况,我们可以使用一个这样的结构体:struct{v T;err error}
或者一个空interface类型作为channel的元素类型。
有时候,由于一些原因,响应需要比预期更长的时间来返回,或者将永远不会返回。我们可以使用timeout来处理这种case。
有时候,响应侧会返回一系列数据,这就是后面要提到的数据流机制。
用channel做通知
通过发送一个值给channel实现1对1的通知
package main
import (
"crypto/rand"
"fmt"
"os"
"sort"
)
func main() {
values := make([]byte, 32 * 1024 * 1024)
if _, err := rand.Read(values); err != nil {
fmt.Println(err)
os.Exit(1)
}
done := make(chan struct{})
go func() { // the sorting goroutine
sort.Slice(values, func(i, j int) bool {
return values[i] < values[j]
})
done <- struct{}{} //通知排序结束
}()
// do some other things ...
<- done // 在此处等待通知
fmt.Println(values[0], values[len(values)-1])
N对1,1对N的通知
package main
import "log"
import "time"
func worker(id int, ready <-chan struct{}, done chan<- struct{}) {
<-ready // block here and wait a notification
log.Print("Worker#", id, " started to process.")
time.Sleep(time.Second) // simulate a workload
log.Print("Worker#", id, " finished its job.")
done <- struct{}{} // notify the main goroutine (N-to-1)
}
func main() {
log.SetFlags(0)
ready, done := make(chan struct{}), make(chan struct{})
go worker(0, ready, done)
go worker(1, ready, done)
go worker(2, ready, done)
time.Sleep(time.Second * 2) // simulate an initialization phase
// 1-to-N notifications.
ready <- struct{}{}; ready <- struct{}{}; ready <- struct{}{}
// Being N-to-1 notified.
<-done; <-done; <-done
}
在实践中1对n和n对1通知很少使用。我们经常用sync.WaitGroup
做n对1的通知,用关闭channel实现1对n的通知。
限速器
我们也可以使用try-send来进行速率限制(在自动收报机的帮助下)。在实践中,限速通常是为了避免配额超出和资源耗尽。
以下显示了从官方Go wiki借来的这样一个例子。在此示例中,任何一分钟持续时间内处理的请求数不会超过200。
package main
import "fmt"
import "time"
type Request interface{}
func handle(r Request) {fmt.Println(r.(int))}
const RateLimitPeriod = time.Minute
const RateLimit = 200 // most 200 requests in one minute
func handleRequests(requests <-chan Request) {
quotas := make(chan time.Time, RateLimit)
go func() {
tick := time.NewTicker(RateLimitPeriod / RateLimit)
defer tick.Stop()
for t := range tick.C {
select {
case quotas <- t:
default:
}
}
}()
for r := range requests {
<-quotas
go handle(r)
}
}
func main() {
requests := make(chan Request)
go handleRequests(requests)
// time.Sleep(time.Minute)
for i := 0; ; i++ {requests <- i}
}
网友评论