21-Channel

作者: 极客江南 | 来源:发表于2018-10-10 17:00 被阅读297次

    多线程同步问题

    • 互斥锁
      • 互斥锁的本质是当一个goroutine访问的时候, 其它goroutine都不能访问
      • 这样就能实现资源同步, 但是在避免资源竞争的同时也降低了程序的并发性能. 程序由原来的并发执行变成了串行
    • 案例:
      • 有一个打印函数, 用于逐个打印字符串中的字符, 有两个人都开启了goroutine去打印
      • 如果没有添加互斥锁, 那么两个人都有机会输出自己的内容
      • 如果添加了互斥锁, 那么会先输出某一个的, 输出完毕之后再输出另外一个人的
    package main
    import (
        "fmt"
        "sync"
        "time"
    )
    // 创建一把互斥锁
    var lock sync.Mutex
    
    func printer(str string)  {
        // 让先来的人拿到锁, 把当前函数锁住, 其它人都无法执行
        // 上厕所关门
        lock.Lock()
        for _, v := range str{
            fmt.Printf("%c", v)
            time.Sleep(time.Millisecond * 500)
        }
        // 先来的人执行完毕之后, 把锁释放掉, 让其它人可以继续使用当前函数
        // 上厕所开门
        lock.Unlock()
    }
    func person1()  {
        printer("hello")
    }
    func person2()  {
        printer("world")
    }
    func main() {
        go person1()
        go person2()
        for{
            ;
        }
    }
    

    生产者消费者问题

    • 所谓的生产者消费者模型就是
      • 某个模块(函数)负责生产数据, 这些数据由另一个模块来负责处理
      • 一般生产者消费者模型包含三个部分"生产者"、"缓冲区"、"消费者"
    • 为什么生产者消费者模型要含三个部分? 直接生产和消费不行么?
    • 一个案例说明一切
      • 生产者好比现实生活中的某个人
      • 缓冲区好比现实生活中的邮箱
      • 消费者好比现实生活中的邮递员
    • 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员, 那么如果将来过去的邮递员离职了, 你想邮寄信件必须想办法结识新的邮递员(消费者发生变化, 会直接影响生产者, 耦合性太强)
    • 如果在生产者和消费者之间添加一个缓冲区, 那么就好比有了邮箱, 以后邮寄信件不是找邮递员, 只需把信件投递到邮箱中即可, 写信的人不需要关心邮递员是谁(解耦)
    • 如果只有生产者和消费者, 那么每个人邮寄信件都需要直接找邮递员(1对1关系), 如果有10个人要邮寄信件, 那么邮递员只能依次找到每个人, 然后才能取件(效率低下)
    • 如果在生产者和消费者之间添加一个缓冲区, 那么所有的人只需要将信件投递到邮箱即可, 邮递员不用关心有多少人要邮寄信件, 也不用依次取件, 只需要找到邮箱从邮箱中统一取件即可(效率提高)
    • 如果只有生产者和消费者, 那么如果邮寄信件太多邮递员无法一次拿走, 这个时候非常难办
    • 如果在生产者和消费者之间添加一个缓冲区, 那么如果信件太多可以先拿走一部分, 剩下的继续放到邮箱中下次再拿
    • ... ...

    生产者和消费者资源竞争问题

    • 例如生产比较慢, 而消费比较快, 就会导致消费者消费到错误数据
    package main
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    // 创建一把互斥锁
    var lock = sync.Mutex{}
    
    // 定义缓冲区
    var sce []int = make([]int, 10)
    
    // 定义生产者
    func producer(){
        // 加锁, 注意是lock就是我们的锁, 全局公用一把锁
        lock.Lock()
        rand.Seed(time.Now().UnixNano())
        for i:=0;i<10;i++{
            num := rand.Intn(100)
            sce[i] = num
            fmt.Println("生产者生产了: ", num)
            time.Sleep(time.Millisecond * 500)
        }
        // 解锁
        lock.Unlock()
    }
    // 定义消费者
    func consumer()  {
        // 加锁, 注意和生产者中用的是同一把锁
        // 如果生产者中已加过了, 则阻塞直到解锁后再重新加锁
        lock.Lock()
        for i:=0;i<10;i++{
            num := sce[i]
            fmt.Println("---消费者消费了", num)
        }
        lock.Unlock()
    }
    
    func main() {
        go producer()
        go consumer()
        for{
            ;
        }
    }
    
    • 思考: 那如果是一对多, 或者多对多的关系, 上述代码有问题么?

    管道(Channel)

    • 上述实现并发的代码中为了保持主线程不挂掉, 我们都会在最后写上一个死循环或者写上一个定时器来实现等待goroutine执行完毕
    • 上述实现并发的代码中为了解决生产者消费者资源同步问题, 我们利用加锁来解决, 但是这仅仅是一对一的情况, 如果是一对多或者多对多, 上述代码还是会出现问题
    • 综上所述, 企业开发中需要一种更牛X的技术来解决上述问题, 那就是管道(Channel)

    • Channel的本质是一个队列
    • Channel是线程安全的, 也就是自带锁定功能
    • Channel声明和初始化
      • 声明: var 变量名chan 数据类型
      • 初始化: mych := make(chan 数据类型, 容量)
      • Channel和切片还有字典一样, 必须make之后才能使用
      • Channel和切片还有字典一样, 是引用类型
    package main
    import "fmt"
    func main() {
        // 1.声明一个管道
        var mych chan int
        // 2.初始化一个管道
        mych = make(chan int, 3)
        // 3.查看管道的长度和容量
        fmt.Println("长度是", len(mych), "容量是", cap(mych))
        // 4.像管道中写入数据
        mych<- 666
        fmt.Println("长度是", len(mych), "容量是", cap(mych))
        // 5.取出管道中写入的数据
        num := <-mych
        fmt.Println("num = ", num)
        fmt.Println("长度是", len(mych), "容量是", cap(mych))
    }
    

    • 注意点:
      • 管道中只能存放声明的数据类型, 不能存放其它数据类型
      • 管道中如果已经没有数据, 再取就会报错
      • 如果管道中数据已满, 再写入就会报错
    package main
    
    import "fmt"
    
    func main() {
        // 1.声明一个管道
        var mych chan int
        // 2.初始化一个管道
        mych = make(chan int, 3)
    
        // 注意点: 管道中只能存放声明的数据类型, 不能存放其它数据类型
        //mych<-3.14
    
        // 注意点: 管道中如果已经没有数据, 
        // 并且检测不到有其它协程再往管道中写入数据, 那么再取就会报错
        //num = <-mych
        //fmt.Println("num = ", num)
    
        // 注意点: 如果管道中数据已满, 再写入就会报错
        mych<- 666
        mych<- 777
        mych<- 888
        mych<- 999
    }
    

    • 管道的关闭和遍历
    package main
    
    import "fmt"
    
    func main() {
        // 1.创建一个管道
        mych := make(chan int, 3)
        // 2.往管道中存入数据
        mych<-666
        mych<-777
        mych<-888
        // 3.遍历管道
        // 第一次遍历i等于0, len = 3,
        // 第二次遍历i等于1, len = 2
        // 第三次遍历i等于2, len = 1
        //for i:=0; i<len(mych); i++{
        //  fmt.Println(<-mych) // 输出结果不正确
        //}
    
        // 3.写入完数据之后先关闭管道
        // 注意点: 管道关闭之后只能读不能写
        close(mych)
        //mych<- 999 // 报错
    
        // 4.遍历管道
        // 利用for range遍历, 必须先关闭管道, 否则会报错
        //for value := range mych{
        //  fmt.Println(value)
        //}
    
        // close主要用途:
        // 在企业开发中我们可能不确定管道有还没有有数据, 所以我们可能一直获取
        // 但是我们可以通过ok-idiom模式判断管道是否关闭, 如果关闭会返回false给ok
        for{
            if num, ok:= <-mych; ok{
                fmt.Println(num)
            }else{
                break;
            }
        }
        fmt.Println("数据读取完毕")
    }
    

    • Channel阻塞现象
      • 单独在主线程中操作管道, 写满了会报错, 没有数据去获取也会报错
      • 只要在协程中操作管道过, 写满了就会阻塞, 没有就数据去获取也会阻塞
    package main
    import (
        "fmt"
        "time"
    )
    // 创建一个管道
    var myCh = make(chan int, 5)
    func demo()  {
        var myCh = make(chan int, 5)
        //myCh<-111
        //myCh<-222
        //myCh<-333
        //myCh<-444
        //myCh<-555
        //fmt.Println("我是第六次添加之前代码")
        //myCh<-666
        //fmt.Println("我是第六次添加之后代码")
    
        fmt.Println("我是第六次直接获取之前代码")
        <-myCh
        fmt.Println("我是第六次直接获取之后代码")
    }
    func test()  {
        //myCh<-111
        //myCh<-222
        //myCh<-333
        //myCh<-444
        //myCh<-555
        //fmt.Println("我是第六次添加之前代码")
        //myCh<-666
        //fmt.Println("我是第六次添加之后代码")
    
        //fmt.Println("我是第六次直接获取之前代码")
        //<-myCh
        //fmt.Println("我是第六次直接获取之后代码")
    }
    func example()  {
        time.Sleep(time.Second * 2)
        myCh<-666
    }
    func main() {
        // 1.同一个go程中操作管道
        // 写满了会报错
        //myCh<-111
        //myCh<-222
        //myCh<-333
        //myCh<-444
        //myCh<-555
        //myCh<-666
    
        // 没有了去取也会报错
        //<-myCh
    
        // 2.在协程中操作管道
        // 写满了不会报错, 但是会阻塞
        //go test()
    
        // 没有了去取也不会报错, 也会阻塞
        //go test()
    
        //go demo()
        //go demo()
        
        // 3.只要在协程中操作了管道, 就会发生阻塞现象
        go example()
        fmt.Println("myCh之前代码")
        <-myCh
        fmt.Println("myCh之后代码")
    
        //for{
        //  ;
        //}
    }
    

    • 利用Channel实现生产者消费者
    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    // 定义缓冲区
    var myCh = make(chan int, 5)
    var exitCh = make(chan bool, 1)
    
    // 定义生产者
    func producer(){
        rand.Seed(time.Now().UnixNano())
        for i:=0;i<10;i++{
            num := rand.Intn(100)
            fmt.Println("生产者生产了: ", num)
            // 往管道中写入数据
            myCh<-num
            //time.Sleep(time.Millisecond * 500)
        }
        // 生产完毕之后关闭管道
        close(myCh)
        fmt.Println("生产者停止生产")
    }
    // 定义消费者
    func consumer()  {
        // 不断从管道中获取数据, 直到管道关闭位置
        for{
            if num, ok := <-myCh; !ok{
                break
            }else{
                fmt.Println("---消费者消费了", num)
            }
        }
        fmt.Println("消费者停止消费")
        exitCh<-true
    }
    
    func main() {
        go producer()
        go consumer()
        fmt.Println("exitCh之前代码")
        <-exitCh
        fmt.Println("exitCh之后代码") 
    }
    

    • 无缓冲Channel
    package main
    import "fmt"
    var myCh1 = make(chan int, 5)
    var myCh2 = make(chan int, 0)
    func main() {
        // 有缓冲管道
        // 只写入, 不读取不会报错
        //myCh1<-1
        //myCh1<-2
        //myCh1<-3
        //myCh1<-4
        //myCh1<-5
        //fmt.Println("len =",len(myCh1), "cap =", cap(myCh1))
    
        // 无缓冲管道
        // 只有两端同时准备好才不会报错
        go func() {
            fmt.Println(<-myCh2)
        }()
        // 只写入, 不读取会报错
        myCh2<-1
        //fmt.Println("len =",len(myCh2), "cap =", cap(myCh2))
        // 写入之后在同一个线程读取也会报错
        //fmt.Println(<-myCh2)
        // 在主程中先写入, 在子程中后读取也会报错
        //go func() {
        //  fmt.Println(<-myCh2)
        //}()
    }
    

    • 无缓冲Channel和有缓冲Channel
      • 有缓冲管道具备异步的能力(写几个读一个或读几个)
      • 无缓冲管道具备同步的能力(写一个读一个)
    package main
    import (
        "fmt"
        "math/rand"
        "time"
    )
    // 定义缓冲区
    //var myCh = make(chan int, 0)
    var myCh = make(chan int)
    var exitCh = make(chan bool, 1)
    
    // 定义生产者
    func producer(){
        rand.Seed(time.Now().UnixNano())
        for i:=0;i<10;i++{
            num := rand.Intn(100)
            fmt.Println("生产者生产了: ", num)
            // 往管道中写入数据
            myCh<-num
            //time.Sleep(time.Millisecond * 500)
        }
        // 生产完毕之后关闭管道
        close(myCh)
        fmt.Println("生产者停止生产")
    }
    // 定义消费者
    func consumer()  {
        // 不断从管道中获取数据, 直到管道关闭位置
        for{
            if num, ok := <-myCh; !ok{
                break
            }else{
                fmt.Println("---消费者消费了", num)
            }
        }
        fmt.Println("消费者停止消费")
        exitCh<-true
    }
    
    func main() {
        go producer()
        go consumer()
        fmt.Println("exitCh之前代码")
        <-exitCh
        fmt.Println("exitCh之后代码")
    }
    

    IO的延迟说明:
    看到的输出结果和我们想象的不太一样, 是因为IO输出非常消耗性能, 输出之后还没来得及赋值可能就跑去执行别的协程了


    • 单向管道和双向管道
      • 默认情况下所有管道都是双向了(可读可写)
      • 但是在企业开发中, 我们经常需要用到将一个管道作为参数传递
      • 在传递的过程中希望对方只能单向使用, 要么只能写,要么只能读
    • 双向管道
      • var myCh chan int = make(chan int, 0)
    • 单向管道
      • var myCh chan<- int = make(chan<- int, 0)
      • var myCh <-chan int = make(<-chan int, 0)
    • 注意点:
      • 双向管道可以自动转换为任意一种单向管道
      • 单向管道不能转换为双向管道
    package main
    
    import "fmt"
    
    func main() {
        // 1.定义一个双向管道
        var myCh chan int = make(chan int, 5)
    
        // 2.将双向管道转换单向管道
        var myCh2 chan<- int
        myCh2 = myCh
        fmt.Println(myCh2)
        var myCh3 <-chan int
        myCh3 = myCh
        fmt.Println(myCh3)
    
        // 3.双向管道,可读可写
        myCh<-1
        myCh<-2
        myCh<-3
        fmt.Println(<-myCh)
        
        // 3.只写管道,只能写, 不能读
        //  myCh2<-666
        //  fmt.Println(<-myCh2)
    
        // 4.指读管道, 只能读,不能写
        fmt.Println(<-myCh3)
        //myCh3<-666
        
        // 注意点: 管道之间赋值是地址传递, 以上三个管道底层指向相同容器
    }
    
    • 单向管道作为函数参数
    package main
    import (
        "fmt"
        "math/rand"
        "time"
    )
    // 定义生产者
    func producer(myCh chan<- int){
        rand.Seed(time.Now().UnixNano())
        for i:=0;i<10;i++{
            num := rand.Intn(100)
            fmt.Println("生产者生产了: ", num)
            // 往管道中写入数据
            myCh<-num
            //time.Sleep(time.Millisecond * 500)
        }
        // 生产完毕之后关闭管道
        close(myCh)
        fmt.Println("生产者停止生产")
    }
    // 定义消费者
    func consumer(myCh <-chan int)  {
        // 不断从管道中获取数据, 直到管道关闭位置
        for{
            if num, ok := <-myCh; !ok{
                break
            }else{
                fmt.Println("---消费者消费了", num)
            }
        }
        fmt.Println("消费者停止消费")
    
    }
    
    func main() {
        // 定义缓冲区
        var myCh = make(chan int, 5)
        go producer(myCh)
        consumer(myCh)
    }
    

    select选择结构

    • select是Go中的一个控制结构,类似于switch语句,用于处理异步IO操作
      • 如果有多个case都可以运行,select会随机选出一个执行,其他不会执行。
      • 如果没有可运行的case语句,且有default语句,那么就会执行default的动作。
      • 如果没有可运行的case语句,且没有default语句,select将阻塞,直到某个case通信可以运行
        select {
        case IO操作1:
            IO操作1读取或写入成功就执行
        case IO操作2:
            IO操作2读取或写入成功就执行
        default:
            如果上面case都没有成功,则进入default处理流程
        }
    
    • 注意点:
      • select的case后面必须是一个IO操作
      • 一般情况下使用select结构不用写default
    package main
    
    import (
        "fmt"
        "time"
    )
    func main() {
        // 创建管道
        var myCh = make(chan int)
        var exitCh = make(chan bool)
    
        // 生产数据
        go func() {
            for i:=0;i <10;i++{
                myCh<-i
                time.Sleep(time.Second)
            }
            //close(myCh)
            exitCh<-true
        }()
    
        // 读取数据
        for{
            fmt.Println("读取代码被执行了")
            select {
            case num:= <-myCh:
                fmt.Println("读到了", num)
            case <-exitCh:
                //break // 没用, 跳出的是select
                return
            }
            fmt.Println("-----------")
        }
    }
    
    • select应用场景
      • 实现多路监听
      • 实现超时处理
    package main
    import (
        "fmt"
        "runtime"
        "time"
    )
    
    func main() {
        // 1.创建管道
        myCh := make(chan int, 5)
        exitCh := make(chan bool)
    
        // 2.生成数据
        go func() {
            for i:=0; i<10; i++ {
                myCh<-i
                time.Sleep(time.Second * 3)
            }
        }()
    
        // 3.获取数据
        go func() {
            for{
                select {
                case num:= <-myCh:
                    fmt.Println(num)
                case <-time.After(time.Second * 2):
                    exitCh<-true
                    runtime.Goexit()
                }
            }
        }()
    
        <-exitCh
        fmt.Println("程序结束")
    }
    

    定时器补充

    • 一次性定时器
    • NewTimer函数
      • func NewTimer(d Duration) *Timer
      • NewTimer创建一个Timer,它会在到期后向Timer自身的C字段发送当时的时间
    type Timer struct {
        C <-chan Time // 对于我们来说, 这个属性是只读的管道
        r runtimeTimer
    }
    
    package main
    import (
        "fmt"
        "time"
    )
    func main() {
        start := time.Now()
        fmt.Println("开始时间", start)
        timer := time.NewTimer(time.Second * 3)
        fmt.Println("读取之前代码被执行")
        end := <-timer.C // 系统写入数据之前会阻塞
        fmt.Println("读取之后代码被执行")
        fmt.Println("结束时间", end)
    }
    
    • After函数
      • func After(d Duration) <-chan Time
      • 底层就是对NewTimer的封装, 只不过返回值不同而已
    func After(d Duration) <-chan Time {
        return NewTimer(d).C
    }
    
    package main
    import (
        "fmt"
        "time"
    )
    func main() {
        start := time.Now()
        fmt.Println("开始时间", start)
        timer := time.After(time.Second * 3)
        fmt.Println("读取之前代码被执行")
        end := <-timer // 系统写入数据之前会阻塞
        fmt.Println("读取之后代码被执行")
        fmt.Println("结束时间", end)
    }
    

    • 周期性定时器
    • NewTicker函数
      • func NewTicker(d Duration) *Ticker
      • 和NewTimer差不多, 只不过NewTimer只会往管道中写入一次数据, 而NewTicker每隔一段时间就会写一次
    type Ticker struct {
        C <-chan Time // 周期性传递时间信息的通道
        // 内含隐藏或非导出字段
    }
    
    package main
    import (
        "fmt"
        "time"
    )
    func main() {
        // 1.创建一个周期定时器
        ticker := time.NewTicker(time.Second)
        // 2.不断从重启定时器中获取时间
        for{
            t := <-ticker.C // 系统写入数据之前会阻塞
            fmt.Println(t)
        }
    }
    

    相关文章

      网友评论

        本文标题:21-Channel

        本文链接:https://www.haomeiwen.com/subject/wsqyoftx.html