美文网首页
Channel使用案例分享

Channel使用案例分享

作者: golang推广大使 | 来源:发表于2019-03-11 23:24 被阅读0次

本文将会展示许多channel使用案例。 我希望这篇文字能使你在以下几个方面更加便利:

  • 用channel进行异步和并发编程
  • channel的同步技术有比异步编程概念,例如其他语言中的actor模型和async/await 模式更广泛的应用和更加灵活的特点。

请注意这点,本文的目的在于展示尽可能多的的channel使用案例。我们应该明白channel不是go语言支持的唯一的并发编程技术,在某些情况下,channel也不是最好的解决方案。想了解更多并发编程技术,请了解原子操作和一些其他同步技术。

将channel当做Futures/Promises

Futures/Promises在许多其他语言中都有使用。他们经常与请求和响应相关。

返回仅能接受的channel作为结果

下面的例子中,函数sumSquares的灵活参数调用的值被并发的请求。两个通道中的没有一个接收操作将阻塞,知道一个发送操作执行在相关的channel上。它话费3秒而不是6秒返回最终结果。

package main

import (
    "time"
    "math/rand"
    "fmt"
)

func longTimeRequest() <-chan int32 {
    r := make(chan int32)

    go func() {
        time.Sleep(time.Second * 3) // simulate a workload
        r <- rand.Int31n(100)
    }()

    return r
}

func sumSquares(a, b int32) int32 {
    return a*a + b*b
}

func main() {
    rand.Seed(time.Now().UnixNano())

    a, b := longTimeRequest(), longTimeRequest()
    fmt.Println(sumSquares(<-a, <-b))
}

将只能发送的channel作为参数

与上面的例子相同,下面的例子中,对有两个参数的函数sumSquares的调用的值被并发调用。不同于上一个例子,longTimeRequest函数采用一个仅能发送的channel作为参数,而不是返回一个仅能接受的channel结果。

package main

import (
    "time"
    "math/rand"
    "fmt"
)

func longTimeRequest(r chan<- int32)  {
    time.Sleep(time.Second * 3)
    r <- rand.Int31n(100)
}

func sumSquares(a, b int32) int32 {
    return a*a + b*b
}

func main() {
    rand.Seed(time.Now().UnixNano())

    ra, rb := make(chan int32), make(chan int32)
    go longTimeRequest(ra)
    go longTimeRequest(rb)

    fmt.Println(sumSquares(<-ra, <-rb))
}

事实上,上面的例子中,我们不需要两个channel来传递结果,使用一个就行了

...
results := make(chan int32, 2) // can be buffered or not
    go longTimeRequest(results)
    go longTimeRequest(results)

    fmt.Println(sumSquares(<-results, <-results))
}

第一个响应夺冠

这是上面的例子中只使用一个channel的加强版

有时候,一个份数据可以从多个源接受,以避免高延迟。由于许多因素,这些来源的响应时间差别可能很多。即使对于指定的源,其响应持续时间也不是恒定的。为了使响应持续时间尽可能短,我们可以向分离的goroutine中的每个源发送请求。只会使用第一个响应,其他较慢的响应将被丢弃。
注意,如果有N个源,则通信信道的容量必须至少为N-1,以避免goroutines对应被丢弃的响应被永久阻止。

package main

import (
    "fmt"
    "time"
    "math/rand"
)

func source(c chan<- int32) {
    ra, rb := rand.Int31(), rand.Intn(3) + 1
    time.Sleep(time.Duration(rb) * time.Second) // sleep 1s, 2s or 3s
    c <- ra
}

func main() {
    rand.Seed(time.Now().UnixNano())

    startTime := time.Now()
    c := make(chan int32, 5) // need a buffered channel
    for i := 0; i < cap(c); i++ {
        go source(c)
    }
    rnd := <- c // only the first response is used
    fmt.Println(time.Since(startTime))
    fmt.Println(rnd)
}

通过使用select机制和容量为1的缓冲通道,还有其他一些方法可以实现第一个响应 - 获胜用例。其他方式将在下面介绍。

更多变相的请求-响应案例

由于参数和结果 channel能被缓冲,使得响应侧不需要等待请求侧取出传递的值。
有时候,一个请求并不保证能返回一个合法的值。相反,由于各种原因,可能返回一个错误。对应这种情况,我们可以使用一个这样的结构体:struct{v T;err error}或者一个空interface类型作为channel的元素类型。
有时候,由于一些原因,响应需要比预期更长的时间来返回,或者将永远不会返回。我们可以使用timeout来处理这种case。

有时候,响应侧会返回一系列数据,这就是后面要提到的数据流机制。

用channel做通知

通过发送一个值给channel实现1对1的通知

package main

import (
    "crypto/rand"
    "fmt"
    "os"
    "sort"
)

func main() {
    values := make([]byte, 32 * 1024 * 1024)
    if _, err := rand.Read(values); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }

    done := make(chan struct{})
    go func() { // the sorting goroutine
        sort.Slice(values, func(i, j int) bool {
            return values[i] < values[j]
        })
        done <- struct{}{} //通知排序结束
    }()

    // do some other things ...

    <- done // 在此处等待通知
    fmt.Println(values[0], values[len(values)-1])

N对1,1对N的通知

package main

import "log"
import "time"

func worker(id int, ready <-chan struct{}, done chan<- struct{}) {
    <-ready // block here and wait a notification
    log.Print("Worker#", id, " started to process.")
    time.Sleep(time.Second) // simulate a workload
    log.Print("Worker#", id, " finished its job.")
    done <- struct{}{} // notify the main goroutine (N-to-1)
}

func main() {
    log.SetFlags(0)

    ready, done := make(chan struct{}), make(chan struct{})
    go worker(0, ready, done)
    go worker(1, ready, done)
    go worker(2, ready, done)

    time.Sleep(time.Second * 2) // simulate an initialization phase
    // 1-to-N notifications.
    ready <- struct{}{}; ready <- struct{}{}; ready <- struct{}{}
    // Being N-to-1 notified.
    <-done; <-done; <-done
}

在实践中1对n和n对1通知很少使用。我们经常用sync.WaitGroup做n对1的通知,用关闭channel实现1对n的通知。

限速器

我们也可以使用try-send来进行速率限制(在自动收报机的帮助下)。在实践中,限速通常是为了避免配额超出和资源耗尽。
以下显示了从官方Go wiki借来的这样一个例子。在此示例中,任何一分钟持续时间内处理的请求数不会超过200。

package main

import "fmt"
import "time"

type Request interface{}
func handle(r Request) {fmt.Println(r.(int))}

const RateLimitPeriod = time.Minute
const RateLimit = 200 // most 200 requests in one minute

func handleRequests(requests <-chan Request) {
    quotas := make(chan time.Time, RateLimit)

    go func() {
        tick := time.NewTicker(RateLimitPeriod / RateLimit)
        defer tick.Stop()
        for t := range tick.C {
            select {
            case quotas <- t:
            default:
            }
        }
    }()

    for r := range requests {

        <-quotas
        go handle(r)
    }
}

func main() {
    requests := make(chan Request)
    go handleRequests(requests)
    // time.Sleep(time.Minute)
    for i := 0; ; i++ {requests <- i}
}

相关文章

网友评论

      本文标题:Channel使用案例分享

      本文链接:https://www.haomeiwen.com/subject/blfwpqtx.html