美文网首页
Go语言中的并发进程以及通道通信

Go语言中的并发进程以及通道通信

作者: threadtag | 来源:发表于2020-03-18 16:57 被阅读0次

    goroutine

    在go语言中要并发几个进程很简单

    1. 定义一个函数
    2. 用go运行一下
      例如下面的例子,主程序main开出一个goroutine
    package main
    import(
        "fmt"
    )
    
    func f(){
        for i:=1;i<100;i++{
            if i % 7==0 {
                fmt.Println(i)
            }
         }
    }
    
    func main(){
        fmt.Println("Started in main process")
        fmt.Println("forking to goroutine")
        go f()
        fmt.Println("the main process ends here")
    }
    

    这样一段简单代码,拿去一跑,死的很难看,7的倍数一个都没有打印出来,程序就跑完了。原因是主程序还没有等f跑完就回收内存,卷铺盖走人了.
    正确的方式是,在主程序中加入一个开关,当开出goroutine后,按下开关,然后就在那边等goroutine按起开关

    package main
    import(
        "fmt"
        "sync"
    )
    func f(wg *sync.WaitGroup){
        for i:=1;i<100000000;i++{
            if i % 7==0 {
                fmt.Println(i)
            }
         }
        wg.Done() //按起开关
    }
    
    func main(){
        var wg sync.WaitGroup //开关
        fmt.Println("Started in main process")
        fmt.Println("forking to goroutine")
        wg.Add(1) //按下开关
        go f(&wg)
        wg.Wait() // 等待
        fmt.Println("the main process ends here")
    }
    

    这里我们看到了简单的进程间协同通讯,用的是一个共享的指针。在Go中,进程间可以通过通道channel来传输数据。
     

    Go Channel 语义

    channel 变量的声明通过make来生成

    ch := make(chan int) // 无缓冲的channel
    ch := make(chan int, 3) // 缓冲量为3的channel, 0 表示没有buffer的channel
    

    对channel变量的操作,有发送和接收以及关闭3种.
    接收和发送两种操作都是用向左箭头表示(<-) ,两种不同的意思通过channel变量与<-左右关系来表示。

    ch <- x   //发送操作
    x = <-ch // 接收操作
    

    从字面的意思来说,通道是中间路带,发送和接收两种操作的主语应该是另外一个变量(上面的x).

    channel的send操作
    channel的receive操作

     

    channel 的buffer

    我们可以把无缓冲的通道想像成,一手给另外一手接。


    无缓冲的管道

    把有缓冲的通道想想成一条船,有5个缓冲的通道一次可以载5人。


    有缓冲的管道
    ch :=make(cha int)      // unbuffered channel 
    pool :=make(chan int,2)   // buffered channel
    

    无缓冲channel的阻塞性

    无缓冲的通道不管是send操作还是receive操作都会阻塞程序。在一个goroutine里面上数据send入管道,在另外一个goroutine里面接收这个数据前,前面的那个goroutine会一直在那边等,直至另外一个goroutine里开始receive数据。同样的,在一个goroutine里面进行一次receive 操作的话,如果没有其他一个goroutine进行send 操作的话,前面那个goroutine会一直在那边等。下面程序,将同一个通道的send和receive操作一起放在主程序里面跑,编译能通过,但是运行会出现下面的错误:

    fatal error: all goroutines are asleep - deadlock!

    package main
    import(
        "fmt"
    )
    
    func main(){
        pool :=make(chan int,0) // unbuffered channel
        fmt.Println("sending a number to channel")
        pool<- 10
        fmt.Println("getting it back")
        <-pool  
        // running deadlock
    }
    

    将pool 的buffer size 改成1 就能正常运行。
     

    有缓冲channel 的阻塞性

    • send操作(ch <- x)相当于将变量x的值放入管道,当管道还没有满的时候,该操作会立即执行,程序不会被阻塞在这里。当管道满时, 程序就会阻塞在这边,等待通道里面数据被取走(接收)一个后,就可以进行这里的send操作了。如果有多个等待send操作的语句,系统会安排这边被阻塞的send操作是否被执行。这边所说的系统是go 的运行时(runtime),它负责程序的总调度。
    • Receive操作(x := <-ch) 相当于将管道队列里面的值取出来,腾空管道里面的位置。当管道里面不是空的时候,该操作会立即执行,程序继续往下执行。当管道被取空时,程序在这里阻塞,等待有新的值被send入管道。同样,多个等待receive操作的语句,会由go的运行时决定安排谁率先获得操作权。
    package main
    import(
        "fmt"
        "time"
    )
    func main(){
        pool :=make(chan int,2) // buffered channel
        go func(){
            pool <- 1 //feed pool with 1 value
            fmt.Println("sending 1")
        }()
    
        go func(){
            pool <- 2
            fmt.Println("sending 2")
        }()
    
        go func(){
            pool <- 3
            fmt.Println("sending 3")
        }()
    
        fmt.Println("=")
        time.Sleep(1 * time.Second)
        
        x := <-pool
        fmt.Printf("%d received\n",x)
        
        fmt.Println("==")
        time.Sleep(2 * time.Second)
        x  = <-pool 
        fmt.Printf("%d received\n",x)
        
        fmt.Println("=====")
        time.Sleep(5 * time.Second) 
        x  = <-pool 
        fmt.Printf("%d received\n",x)
        
        fmt.Println("=")
        time.Sleep(1 * time.Second) 
        close(pool)
    }
    

    这里用3个go语句来开启3个goroutine, 由于3个goroutine 可以看成3个独立的线程,3者中哪个先打印出来顺序是随机的。
    调整buffer size, 程序有不同的反应, 当 buffer size 为0 时,输出结果可能是这样的:

    =
    sending 1
    1 received
    ==
    sending 2
    2 received
    =====
    3 received
    =
    sending 3

    当buffer size 为1时:

    sending 1
    =
    sending 2
    1 received
    ==
    2 received
    =====
    sending 3
    3 received
    =

    当buffer size 为2时:

    =
    sending 2
    sending 1
    1 received
    sending 3
    ==
    2 received
    =====
    3 received
    =

    当buffer size 为3时:

    =
    sending 2
    sending 1
    sending 3
    1 received
    ==
    2 received
    =====
    3 received
    =

     

    channel range 枚举操作

    用for range 操作可以循环不断的从一个通道中receive 数据,直到该通道关闭。如果发送端再也不发送数据,而通道中的数据被取完并且通道没有被关闭,程序会报deadlock错误,停止工作。

    package main
    import(
        "fmt"
        "time"
    )
    func main(){
        pool :=make(chan int) // unbuffered channel
        go func(){      
            for x :=0;x<10;x++{
                fmt.Printf("sending:%d\n",x)
                pool <- x
                time.Sleep(1 * time.Second)
            }
            close(pool) //重要
        }()
    
        for x  := range pool{ 
            // this loop will continue to fetch data from the channel until channel is closed
            // when there is no feeding on the sender side and channel is not closed
            // fetching data from channel will encounter deadlock panic
            fmt.Println( x*x)       
        }   
    
    }
    

    其中

    for x  := range pool{  
    ...
    }
    

    相当于

    for{
        x, ok := <-pool
        if !ok{  //channel was closed and drained
              break
        }
    .... 
    }
    //当通道上的数据都取完,并已关闭时, 在从通道上取数据会得到零值(zero value)
    // 同时上面的ok会得到false值
    // 本段代码来自于《go programming language》
    

     

    关闭的channel

    1. 当用close(ch)语句关闭后,如果再用x = <-ch来接收数据将得到零值,而for x:=range {}语句则不会执行。如上面代码所示
    2. 在关闭的channel上执行send操作,会导致程序报错退出。
    3. 在一个未关闭的通道上,执行receive操作次数(包括range操作)大于send的次数,将会导致程序报错退出。

     

    select添加监听器

    当通道用于goroutine的调度时,可以使用下面的循环模式,配合select语句来监听来自不同通道的数据通讯。

    for{
        select{
            case <-ch1
             //...
            case x:=<-ch2
            //...
           case ch3 <-y
            default:
            //...
        }
        if ...{
            break
        }
    }
    

    select语句会等待这些通道发出动静,其中任意一个通道上有通讯时,就触发这个case下面的语句,其他case下语句保持不变。一个没有case的select{}会永远等待。

     

    单一方向的channel

    如果通道作为函数的参数是,可以指明该通道的方向。这样在函数内部对通道只能进行send或receive操作。有了这种限定,会在编译时就能发现代码的错误。

    func counter(output chan<-int, input <-chan int){
         for v :=range input{
               output <-x    
         }
         close(output)
    }
    // 例子来自《Go programming language》
    

     

    例子1: 每0.1秒出一个字符的随机字符生成器

    time.Tick函数返回一个通道,该通道能够周期性的发出信号来。

    package main
    import (
        "fmt"
        "math/rand"
        "time"
    
    )
    func main(){
        tick :=time.Tick(100* time.Millisecond)
        rand.Seed(time.Now().Unix())
        for n:=0;n<10;n++{
            select{
            case <-tick:
                var s int
                for{
                    s = rand.Intn(122)
                    if (s>64 && s<91)|| (s>96 && s<123){  // 字母的ASCII值
                         break
                    }
                }
                fmt.Printf("%c",s) 
            } 
            if n>10{
                break
            }
        } 
        fmt.Println("")
    }
    

     

    例子2:多线程处理复杂计算任务的worker模型

    其基本思想如下图:


    worker模型
    package main
    // 本程序模拟多个线程(worker)共同完成多个任务(产生随机字符)
    import (
        "fmt"
        "time"
        "math/rand"
    )
    
    func random_char() int{
        var s int
        for{
            s = rand.Intn(123)
            if (s>64 && s<91)|| (s>96 && s<123){  // 字母的ASCII值
                 break
            }
        }
        return s
    }
    
    func worker(id int, jobs <-chan int, results chan<- int){
        // heavy work goes in here
        for n := range jobs {   
            time.Sleep(time.Duration(n*50) * time.Millisecond)
            fmt.Printf("worker  %d finished\n",id)
            results<- random_char()
        }
    }
    
    func main(){
        n_threads:=5;// 线程数
        n_jobs:=1000;// 任务数
        var jobs_list =make([]int,n_jobs)
        var results_list = make([]int, n_jobs)
        for i:=0;i<n_jobs;i++{
            // 生成任务
            jobs_list[i]=rand.Intn(3)
        }
            
        //开启通道
        buffer_size:=10
        jobs :=make(chan int,buffer_size)
        results:=make(chan int,buffer_size)
    
        rand.Seed(time.Now().Unix())
        for w := 1; w <= n_threads; w++ {
            // 开启多个线程
            go worker(w,jobs, results)
        }
    
        // 分发第一批任务
        var i int // i为分发掉任务的计数器
        for i=0; i<buffer_size;i++ {
            jobs <- jobs_list[i]
        }
    
        finished :=0    
        for {
            // 当全部完成时,停止
            if finished >= n_jobs{
                break
            }
    
            //循环监测任务的完成情况, 当有任务完成时, 再次分发没有完成的任务
            select{
            case r:=<-results:
                results_list[finished]=r            
                finished ++         
                if i<n_jobs{                
                    jobs <- jobs_list[i] //分发没有完成的任务
                    i++;
                }
            }
        }
        close(jobs)
        close(results)
    
        for i=0;i<len(results_list);i++{
            fmt.Printf("%c",results_list[i])        
        }
        fmt.Println("")
    }
    

    参考

    Go Programming language
    Go实现线程池
    Go by example

    初学者,请多指教
    原创内容,转载请注明 copywrite by threadtag

    相关文章

      网友评论

          本文标题:Go语言中的并发进程以及通道通信

          本文链接:https://www.haomeiwen.com/subject/bpftyhtx.html