美文网首页Go基础系列随笔-生活工作点滴Go语言
15 Go并发编程(二):管道 —— Go并发的通信机制

15 Go并发编程(二):管道 —— Go并发的通信机制

作者: GoFuncChan | 来源:发表于2019-07-06 00:04 被阅读22次

    Go管道

    1.什么是管道?

    管道最早由CSP模型提出,以点对点管道代替内存共享实现并发进程间的数据交互,相比内存共享数据交互的相率要高很多。关于CSP模型在《Go并发编程初探》篇章已提到,这里不再赘述。

    在Go中,管道是一种特殊类型,用chan关键字表示。它的主要作用是在协程之间实现通信,可以说是go协程间的高速公路。

    管道声明:

    var {变量名} chan {数据类型}
    
    //声明一个整型管道chA
    var chA chan int
    

    管道创建:

    {接收变量名} := make(chan {数据类型},{管道容量})
    
    //使用make函数创建一个容量为5的整型管道
    chB := make(chan int,5)
    

    管道有几个特点:

    1.管道是有类型的,基于go的基本数据类型
    2.管道是有方向的,可从管道读出数据,可向管道写入数据。当管道被传递时可设置其读写方向
    3.管道是有缓存的,通过缓存容量可控制协程间的阻塞
    4.管道是可关闭的,且只能被关闭一次,管道也是资源,如果不使用应关闭
    

    管道使用常识:

    1.管道数据读写,管道写入数据后必须关闭;
    2.从一个已经关闭的管道中读取数据,读完了之后,继续读会读到其管道类型的零值;
    3.没有初始化的管道被关闭会报panic;
    4.读取管道数据时应校验其有效性;
    5.关闭管道时会产出一个广播,所有从管道读取数据的协程都会收到消息;
    6.被遍历的管道如果没有收到管道被关闭的广播,遍历会一直被阻塞;
    7.管道一般在写入协程处调用关闭,写只有一个写入协程的情况,管道被关闭后不能写入数据,但其他协程可以读出,注意重复关闭的情况;
    

    以下演示管道在协程间的通信作用:

    func BaseChannel01() {
        //声明一个管道:声明后没有初始化的管道是空指针nil,证明其为引用类型
        //PS:go的引用类型有五种:slice、map、chan、指针、接口,前三种都是直接可以用make()函数创建
        //var chA chan int
        //fmt.Println(chA)
    
        //初始化一个管道
        chB0 := make(chan int, 0)    //无缓存能力的整型管道
        chB1 := make(chan string, 5) //缓存能力为5的字符串管道
        chB2 := make(chan<- int, 3)  //缓存能力为3的整型只写管道
        chB3 := make(<-chan int, 3)  //缓存能力为3的整型只读管道
        fmt.Printf("chB0:类型%T,值%v\n", chB0, chB0)
        fmt.Printf("chB1:类型%T,值%v\n", chB1, chB1)
        fmt.Printf("chB2:类型%T,值%v\n", chB2, chB2)
        fmt.Printf("chB3:类型%T,值%v\n", chB3, chB3)
    
        //协程从通道不断的写入读出数据
        //开辟一个协程往通道里写数据
        go func(ch chan int) {
            //循环不断往通道里写入当前秒数
            for {
                now := time.Now()
                second := now.Second()
                ch <- second
            }
    
        }(chB0)
    
        //另一个协程不断从通道读出数据
        go func(ch chan int) {
            for {
                nowSecond := <-ch
                fmt.Println("子协程读出数据:", nowSecond)
                time.Sleep(time.Second)
            }
        }(chB0)
    
        //父协程不断从通道读出数据
        for {
            nowSecond := <-chB0
            fmt.Println("父协程读出数据:", nowSecond)
            time.Sleep(time.Second)
        }
    
        //读取管道数据时应校验其有效性
        fmt.Println("读取管道数据时校验其有效性:")
        chCheck := make(chan int, 3)
    
        chCheck <- 123
        chCheck <- 456
        chCheck <- 789
        close(chCheck)
    
        go func(ch chan int) {
            chc1 := <-chCheck
            chc2 := <-chCheck
            chc3 := <-chCheck
            chc4 := <-chCheck
            chc5 := <-chCheck
    
            fmt.Println(chc1, "-", chc2, "-", chc3, "-", chc4, "-", chc5)
    
            chc6, ok := <-chCheck
            fmt.Printf("chc5:值%v,有效性:%v\n", chc6, ok)
    
        }(chCheck)
    
        time.Sleep(time.Second * 3)
    
        //遍历管道,管道关闭以后遍历读取会自动被通知退出
        for v := range chCheck{
            time.Sleep(time.Second)
            fmt.Println(v)
        }
    
    }
    
    2.管道的读写和异常

    关闭管道的几个注意事项:

    • 不能关闭一个没有初始化的管道
    • 管道不能重复关闭
    • 不能往已经关闭的管道中写入数据,但是可以从中读数据
    func BaseChanner02() {
        var ch1 chan int
        close(ch1) // panic: close of nil channel
    
        ints1 := make(chan int, 1)
        ints1 <- 111
        close(ints1)
        close(ints1) //panic: close of closed channel]
    
        ints := make(chan int, 1)
        ints <- 111
        close(ints)
        ints <- 111 //panic: send on closed channel
    }
    
    3.无缓存的管道

    使用一个无缓存的管道时应该注意,它是阻塞的:

    • 没人读就永远写不进(阻塞)
    • 没人写就永远读不出(阻塞)
    func BaseChanner03() {
        chInt := make(chan int)
    
        go func(ch chan int) {
            fmt.Println("启动协程1")
            ch <- 111
            close(ch)
            fmt.Println("结束协程1")
        }(chInt)
    
        go func(ch chan int) {
            fmt.Println("启动协程2")
            fmt.Println("管道数据:", <-ch)
            fmt.Println("结束协程2")
        }(chInt)
    
        for {
            time.Sleep(time.Second)
        }
    
    }
    
    4.有缓存能力的管道

    可利用管道缓存能力进行协程调度,管道的元素个数或称缓存能力,决定协程是否产生阻塞,若管道数据已满则阻塞,写入阻塞读出也阻塞,这是相互的。

    func BaseChanner04() {
        //创建一个缓存能力为3的整型管道
        chInt := make(chan int, 3)
    
        go func(ch chan int) {
            fmt.Println("启动协程1")
            for i := 1; i <= 5; i++ {
                time.Sleep(time.Second * 2)
                fmt.Println("协程1写入数据:", i)
                ch <- i
            }
            fmt.Println("结束协程1")
        }(chInt)
    
        go func(ch chan int) {
            fmt.Println("启动协程2")
            for i := 1; i <= 5; i++ {
                num := <-ch
                fmt.Println("协程2读出数据:", num)
    
            }
            fmt.Println("结束协程2")
        }(chInt)
    
        for {
            time.Sleep(time.Second * 3)
        }
    
    }
    
    5.select选择管道,协程多路复用

    在讲到Go外壳:分支专题是提到select,select关键字是go特有的,其主要用于配合管道实现多路复用。

    func BaseChanner05() {
        //创建三个管道
        ch1 := make(chan int, 3)
        ch2 := make(chan int, 4)
        ch3 := make(chan int, 5)
    
        //创建3条协程
        go func(c chan int) {
            ticker := time.NewTicker(time.Second * 1)
            for {
                <-ticker.C
                c <- 1
            }
        }(ch1)
        go func(c chan int) {
            ticker := time.NewTicker(time.Second * 2)
            for {
                <-ticker.C
                c <- 2
            }
        }(ch2)
        go func(c chan int) {
            ticker := time.NewTicker(time.Second * 3)
            for {
                <-ticker.C
                c <- 3
            }
        }(ch3)
    
        //time.Sleep(time.Second)
        //主协程select多路复用,for不断获取不同管道的数据,随先来则优先处理谁。
        for {
            select {
            case chV1, ok := <-ch1:
                fmt.Printf("管道ch1输出:%v,有效性%v\n", chV1, ok)
            case chV2, ok := <-ch2:
                fmt.Printf("管道ch2输出:%v,有效性%v\n", chV2, ok)
            case chV3, ok := <-ch3:
                fmt.Printf("管道ch3输出:%v,有效性%v\n", chV3, ok)
        }
    }
    
    6.通过容量控制并发数

    利用有容量管道的阻塞能力——地铁闸机模型

    func BaseChanner06() {
        //创建一个容量为5的管道,无论协程开多少,控制每次5条并发
        semaphore := make(chan int, 5)
    
        for i := 1; i <= 100; i++ {
            go func(c chan int, n int) {
                for {
                    c <- i //抢通道写入,抢不到则阻塞
                    fmt.Println("协程", n, "抢到通道")
                    time.Sleep(time.Second)
                    <-c //做完操作后自己读出,空出容量
                }
            }(semaphore, i)
        }
    
        for {
            time.Sleep(time.Second)
        }
    
    }
    
    7.定时器

    固定时间和周期时长定时器,其与time.sleep()区别是可以终止和重置定时器。下面演示一下timer和ticker,以及在子协程中终止ticker

    func BaseChannel07() {
        //使用固定时间定时
        timer := time.NewTimer(time.Second * 3)
        <-timer.C
        fmt.Println("父协程定时3秒输出!!!")
        
        //简单的使用变量标识ticker状态
        var tickerStopped = false
        //使用周期定时器
        ticker := time.NewTicker(time.Second * 1)
        go func(t *time.Ticker) {
            //5秒后终止周期器
            //使用固定时间定时
            timer := time.NewTimer(time.Second * 5)
            <-timer.C
            fmt.Println("子协程定时5秒关闭周期器!!!")
            t.Stop()
            tickerStopped = true
            runtime.Goexit()
        }(ticker)
    
        for {
            if !tickerStopped {
                s := <-ticker.C
                fmt.Println("父协程每隔1秒输出!!!", s)
            } else {
                fmt.Println("子协程已关闭周期定时器!!!")
                os.Exit(0)
            }
    
        }
    
        //不可撤销的time.Sleep()
    
    }
    
    8.如何优雅的关闭管道

    比较优雅的方式一般建议在发送方关闭。

    协程对管道的操作分几种情况:

    • 一个发送者,一个接收者;

    在确认只有一个发送者的情况下,管道关闭时比较简单的,写入完成后记得立即或延迟关闭管道即可,接收者在读完所有数据后会校验读取数据的有效性。

    • 一个发送者,多个接收者;

    这种情况与上一种类似,如果其他多个协程的接受者在遍历读取,它们都会收到管道被关闭的广播,并退出退出遍历阻塞状态。

    • 多个发送者,一个接收者;

    这种情况比较复杂,各个发送者都不能粗暴的关闭管道,虽然可以通过sync.Once控制只关闭一次,但其他发送者仍有可能向一个已被关闭的管道发送数据。这里可能需要其他状态量标志各个发送者的完成状态,接收者监控该状态量确认各发送者写入完成后,由接收者关闭管道。可以使用等待组、状态变量、或其他通道等等。

    • 多个发送者,多个接收者;

    一般这种状况较少,多个协程一起对同一个通道进行读写,一般都需要一个管理者监控该通道的读写完成情况,如单独一个协程检测各个操作该通道的协程的完成状态,当全部读写完成后再进行管道关闭。

    相关文章

      网友评论

        本文标题:15 Go并发编程(二):管道 —— Go并发的通信机制

        本文链接:https://www.haomeiwen.com/subject/midyhctx.html