Go select

作者: JunChow520 | 来源:发表于2021-04-13 19:21 被阅读0次

    通过select语句可以监听channel上的数据流动

    Golang的select语句类似于UNIX的select()函数的轮询机制,在UNIX中可通过调用select()函数来监控一系列的文件句柄(文件描述符,数量有限),一旦某个文件句柄发生了I/O动作select()调用就会被返回。该机制也被用于高并发的Socket服务器程序中。

    select来源于网络I/O模型中的select,本质上I/O多路复用技术,只不过Golang中的select基于的并非网络而是channel

    select {
      case communication clause:
        statement(s)
      case communication clause:
        statement(s)
      default:
        statement(s)
    }
    

    select语句是Golang中的控制结构,类似用于通信的switch语句,也被称为channel开关。select语句会等待channel准备就绪(收发操作),以便在不同的case下执行。由select开始的一个新的选择块,每个选择条件由case语句来描述。与switch语句相比,select有较多限制,最大的限制在于每个case语句必须是一个I/O操作。

    select{
    case v1 := <-ch1:
        fmt.Printf("[CH1] received: %v\n", v1)
    case v2,ok := <-ch2:
        if ok{
            fmt.Printf("[CH2] received: %v\n", v2)
        }else{
            fmt.Printf("[CH2] closed\n")
        }
    case ch3 <-msg:
        fmt.Printf("[CH3] sent: %v\n", msg)
    default:
        fmt.Printf("NO Communicating\n")
    }
    

    例如:上例select语句拥有四个case子句,前两个是channelreceive接收操作,第三个是channelsend发送操作,最后一个default默认操作。当代码执行到select语句时,case子句会按源代码的顺序进行评估,且只评估一次。评估结果会出现下面几种情况:

    1. default外,若只有一个case评估通过则执行此case中的语句。
    2. default外,若用多个case评估通过则随机挑选一个执行。
    3. default外,所有的case评估都不通过,则执行default
    4. 若没有defaultselect代码块发生阻塞,直到有一个case通过评估,否则一直阻塞。
    5. casereceive接收的是nil则也会发生阻塞

    监听

    Golang的select用来监听和channel有关的I/O操作,可监听进入channel时的数据,也可以是用channel发送值时。当I/O操作发生时会触发相应地动作,因此每个case都必须是一个I/O操作,确切的说应该是一个面向channel的I/O操作。

    例如:定时器

    ticker1 := time.NewTicker(time.Second * 1)
    ticker2 := time.NewTicker(time.Second * 3)
    
    for{
        select{
        case <-ticker1.C:
            fmt.Printf("[1] TICK\n")
        case <-ticker2.C:
            fmt.Printf("[2] TICK\n")
        }
    }
    

    在执行select语句时,运行时系统会自上而下地判断每个case中的发送或接收操作是否可以被立即执行,所谓立即执行即当前goroutine不会因此操作而被阻塞。

    select语句只能用于channel的读写操作

    例如:使用select来检测channel是否已满

    ch := make(chan int, 1)
    ch<-1
    
    select{
    case ch<-2:
        fmt.Printf("send to channel\n")
    default:
        fmt.Printf("channel is full\n")
    }
    

    特性

    • 每个case都必须是一个channel
    • 所有channel表达式都会被求值
    • 所有被发送的表达式都会被求值
    • 若任意某个channel可执行就执行,其它被忽略。

    例如:获取斐波拉兹数列

    func fib(ch, quit chan int){
        x,y := 0,1
        for{
            select{
            case ch <- x:
                x,y = y, x+y
            case <-quit:
                fmt.Printf("QUIT\n")
                return
            }
        }
    }
    
    func main(){
        ch := make(chan int)
        quit := make(chan int)
    
        go func(){
            for i:=0; i<10; i++{
                fmt.Println(<-ch)
            }
            quit<-0
        }()
    
        fib(ch, quit)
    }
    

    case

    • select中每个case必须是一个channel操作,要么是发送要么是接收。

    select执行过程中必须命中某一case分支,若在遍历所有case后都没有命中,则会进入default分支。若没有default分支则select发生阻塞,直到某个case可以命中。若一直都没有命中,则select抛出deadlock死锁错误。

    • 循环中每次select都会对所有channel表达式求值

    例如:通过time.After实现定时器,定时任务可通过done channel停止。

    done := make(chan bool, 1)
    close(done)
    for{
        select{
        case <-time.After(time.Second):
            fmt.Printf("Time after\n")
        case <-done:
            //读取零值 false
            fmt.Printf("Read done\n")
        }
    }
    
    • 若多个case满足读写条件,select会随机选择一个case来执行。

    select会随机执行一个可运行的case,若没有case可以运行则阻塞,直到有case可以运行。

    select可用于多个channel进行读写操作时仅需一次只处理一个的情况。

    ch := make(chan int, 1024)
    go func(ch chan int){
        for{
            v := <-ch
            fmt.Printf("value = %v\n", v)
        }
    }(ch)
    
    ticker := time.NewTicker(time.Second * 1)
    for i:=0; i<5; i++{
        select{
        case ch<-i:
        case <-ticker.C:
            fmt.Printf("%d: Ticker\n", i)
        }
        time.Sleep(time.Microsecond * 500)
    }
    
    close(ch)
    ticker.Stop()
    

    ticker.Cch同时满足读写条件时,select会随机地选择一个来执行,导致看起来一些数据丢了。

    • 对于case条件语句中若存在channel值为nil的读写操作,则该分支会被忽略。
    var ch chan int
    
    go func(ch chan int){
        ch <- 100
    }(ch)
    
    select {
    case <-ch:
        fmt.Printf("Channel recieved\n")
    }
    

    发生错误:fatal error: all goroutines are asleep - deadlock!

    default

    select语句会被阻塞,直到其中一个case被执行。若select中没有任何case,它将永远阻塞,从而导致死锁。

    例如:空select{}引发死锁

    func main(){
        select {
        
        }
    }
    

    对于空的select语句,程序会被阻塞,准确来说是当前goroutine会被阻塞。Golang自带死锁检测机制,发现当前goroutine再也没有机会被唤醒时,则会panic

    fatal error: all goroutines are asleep - deadlock!
    

    通过带defaultselect实现非阻塞读写,用于防止select发生阻塞。

    select{
      default:
    }
    

    多个case运行时select会随机公平地选出一个执行,其它不会执行。若存在default子句则会执行该语句。若没有defaultselect阻塞,直到某个通信可以运行。Go不会重新对channel或值进行求值。

    select语句永远阻塞,没有其它goroutine写入此channel时,将导致死锁。

    ch := make(chan int)
    select{
    case <-ch:
    }
    
    fatal error: all goroutines are asleep - deadlock!
    

    若存在默认情况default则不会发生死锁deadlock,因为在没有其它case准备就绪时将执行default默认情况。

    ch := make(chan int)
    select{
    case <-ch:
    default:
        fmt.Println("default case executed")
    }
    

    例如:典型生产者消费者模式

    func main(){
        ch1 := make(chan int)
        ch2 := make(chan int)
        //生产者
        go pump1(ch1)
        go pump2(ch2)
        //消费者
        go suck(ch1, ch2)
    
        time.Sleep(1e9)
    }
    
    • ch1ch2在无限循环中通过pump1()pump2()填充整数
    func pump1(ch chan int){
        for i:=0; ; i++{
            ch <- i * 1
        }
    }
    func pump2(ch chan int){
        for i:=0; ; i++{
            ch <- i * 2
        }
    }
    
    • suck()在无限循环中轮询输入项,通过select语句获取不同信道的整数并输出。
    func suck(ch1,ch2 chan int){
        for{
            select{
            case v := <-ch1:
                fmt.Printf("[CH1] receive %d\n", v)
            case v := <-ch2:
                fmt.Printf("[CH2] receive %d\n", v)
            default:
                fmt.Printf("NO Communicating\n")
            }
        }
    }
    

    选择select的哪一个case取决于哪个信道接收到了消息。

    timeout

    case中的channel始终没有接收到数据,同时也没有提供default语句时,select语句整体会发生阻塞。有时并不希望select一直阻塞下去,此时可手动设置一个超时时间。

    func expire(ch chan bool, t int){
        time.Sleep(time.Second * time.Duration(t))
        ch <- true
    }
    
    func main(){
        timeout := make(chan bool, 1)
        go expire(timeout, 2)
    
        ch1 := make(chan string, 1)
        ch2 := make(chan string, 1)
        select{
        case msg1 := <-ch1:
            fmt.Printf("[CH1] received: %s\n", msg1)
        case msg2 := <-ch2:
            fmt.Printf("[CH2] received: %s\n", msg2)
        case <-timeout:
            fmt.Printf("[EXPIRE] exit\n")
        }
    }
    

    例如:使用select实现channel的读取超时机制,不能使用default否则3秒超时未到,就会直接执行default

    timeout := make(chan bool, 1)
    
    go func(){
        time.Sleep(time.Second * 3)
        timeout <- true
    }()
    
    ch := make(chan int)
    select{
    case <-ch:
    case <-timeout:
        fmt.Printf("TIMEOUT\n")
    }
    

    可使用time.After实现超时控制

    ch := make(chan int)
    
    select{
    case <-ch:
        fmt.Printf("read from ch\n")
    case <-time.After(time.Second * time.Duration(3)):
        fmt.Printf("[TIMEOUT] exit\n")
    }
    

    for

    forselect结合时,break是无法跳出for之外的,若需break出来需添加标签使用goto,或break到具体为止。

    • 解决方案1:使用Golang中break的特性在外层for上添加一个标签
    • 解决方案2:使用goto直接跳出循环到指定标记位置
    • 对于for中空的select{}也有可能会引起CPU占用过高的问题
    ch := make(chan bool)
    for i:=0; i<runtime.NumCPU(); i++{
        go func(){
            for{
                select{
                case <-ch:
                    break
                default:
                }
            }
        }()
    }
    
    time.Sleep(time.Second * 10)
    for i:=0; i<runtime.NumCPU(); i++{
        ch<-true
    }
    

    一般来说,使用select监听各个case的I/O事件,每个case都是阻塞的。上例中原本希望select在获取到ch里的数据时立即退出循环,但由于在for循环中,第一次读取ch后仅仅退出了select但并未退出for,因此下次哈希继续执行select逻辑,此时将永远是执行default,直到ch里读取到数据。否则会一直在一个死循环中运行,因此即便只是放到一个goroutine中运行,也会占满所有的CPU。解决的方式直接把default拿掉,这样select会一直阻塞在ch通道的I/O上,当ch有数据时就可以随时响应通道中的信息。

    select实现了一种监听模式,通常用在(无限)循环中,在某中情况下可通过break语句使循环退出。

    ch := make(chan int)
    //定时2s
    ticker := time.NewTicker(time.Second * 2)
    defer ticker.Stop()
    //发送信号
    go func(ch chan int){
        time.Sleep(time.Second * 5)
        ch <- 1
    }(ch)
    //监听I/O
    for{
        select{
        case <-ticker.C:
            fmt.Printf("task running...\n")
        case result,ok := <-ch:{
            if ok{
                fmt.Printf("chan number is %v\n", result)
                break
            }
        }
        }
    }
    fmt.Printf("END\n")
    

    例如:

    ch := make(chan int)
    quit := make(chan bool)
    
    //写数据
    go func() {
        //循环写入
        for i := 0; i < 5; i++ {
            ch <- i
            time.Sleep(time.Second)
        }
        //关闭channel
        close(ch)
        //通知主goroutine推出
        quit <- true
        //退出当前goroutine
        runtime.Goexit()
    }()
    
    //主goroutine 读数据
    for {
        select {
        case num := <-ch:
            fmt.Printf("received number is %d\n", num)
        case <-quit:
            fmt.Printf("quit\n")
            //break //跳出select
            return //终止进程
        }
        fmt.Printf("==================\n")
    }
    

    多路复用

    select是Golang在语言层面提供的多路I/O复用机制,它可检测多个channel是否ready(是否可读或可写)。

    select是如何实现多路复用的,为什么没有在第一个channel操作时阻塞,从而导致后面的case都执行不了。

    相关文章

      网友评论

          本文标题:Go select

          本文链接:https://www.haomeiwen.com/subject/rmrfxltx.html