什么是屏障并发模式?
假设有这么一种场景,一个微服务需要调其他两个微服务的响应结果来返回结果给客户端。屏障并发模式在这里就有用武之地了。
屏障并发模式会使一个服务阻塞等待给客户端响应结果,直到从其他一个或多个不同的Goroutine(服务)中获取到返回内容。怎样才能使服务具有阻塞性质?我们可以用锁,但是 在Go中更习惯使用无缓冲通道。
目标
顾名思义,屏障模式就是让程序阻塞直到准备就绪为止。其目标就是:
- 组合来自一个或多个goroutine的数据。
- 判断这些来自不同goroutine返回的结果是否正确。必须要所有的goroutine返回的结果都正确才能继续执行。
聚合HTTP请求
例如,在一个微服务当中需要执行两个HTTP GET请求,然后将请求的返回结果组合成一个完整内容打印到控制台。这里HTTP 请求必须在不同的Goroutine中执行,只有两个请求都返回正确才会打印。如果任意请求返回错误,只打印错误。设计必须并发执行,可以利用多核并发执行HTTP请求。

在上面的图中,实线表示HTTP请求,虚线表示请求响应通道。main函数通过两个Goroutine发起两个请求。两个Goroutine共享一个通道将结果返回给main函数。这个应用程序的主要目标是获得两个不同调用的合并响应。
首先需要一个函数来为每个HTTP请求创建Goroutine。你还记得Goroutine之间如何通信的吧?没错是用channel!因此需要一个channel来处理请求的返回结果或错误。但可以稍微简化,将每个请求的响应和错误放在一个barrierResp结构体中。
代码实现
import (
"fmt"
"io/ioutil"
"net/http"
"time"
)
var timeoutMilliseconds int = 5000
type barrierResp struct {
Err error
Resp string
}
func barrier(endpoints ...string) {
requestNumber := len(endpoints)
in := make(chan barrierResp, requestNumber)
defer close(in)
responses := make([]barrierResp, requestNumber)
for _, endpoint := range endpoints {
go makeRequest(in, endpoint)
}
var hasError bool
for i := 0; i < requestNumber; i++ {
resp := <-in
if resp.Err != nil {
fmt.Println("ERROR: ", resp.Err)
hasError = true
}
responses[i] = resp
}
if !hasError {
for _, resp := range responses {
fmt.Println(resp.Resp)
}
}
}
代码很简单,每个Goroutine都返回一个barrierResp类型。通过一个缓冲通道来接收不同Goroutine的返回结果,并对返回的结果进行检查是否返回有错误。
func makeRequest(out chan<- barrierResp, url string) {
res := barrierResp{}
client := http.Client{
Timeout: time.Duration(timeoutMilliseconds) * time.Millisecond,
}
resp, err := client.Get(url)
if err != nil {
res.Err = err
out <- res
return
}
byt, err := ioutil.ReadAll(resp.Body)
if err != nil {
res.Err = err
out <- res
return
}
res.Resp = string(byt)
out <- res
}
makeRequest函数很简单,接受一个channel来发送返回结果。这里通过http.Client设置请求超时。最后将请求结果通过out通道发送给barrier函数。
注意:发起HTTP请求必须设置超时时间,否则程序可能会阻塞。
网友评论