Golang笔记-- 并发模式

作者: 奶爸撸代码 | 来源:发表于2019-08-19 00:03 被阅读8次

    并发模式

    [TOC]

    并发程序指同时进行多个任务的程序, Go程序一种支持并发的方式是通过goroutine和channel, 支持“顺序通信进程”(communicating sequential processes)或被简称为CSP.

    CSP是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中.

    goroutines

    • Golang中并发的执行单元叫goroutine, 可类比为线程, 或协程.
    • 程序启动时main函数在main goroutine中运行
    • go语句创建新的goroutine, 多个goroutine并发执行:
    go f()
    go func() {
       }()
    
    • 一个简单的并发网络模型:
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err) // e.g., connection aborted
            continue
        }
        go handleConn(conn) // handle connections concurrently
    }
    
    func handleConn(c net.Conn) {
        defer c.Close()
        for {
            _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
            if err != nil {
                return // e.g., client disconnected
            }
            time.Sleep(1 * time.Second)
        }
    }
    
    • Echo 服务器
    func echo(c net.Conn, shout string, delay time.Duration) {
        fmt.Fprintln(c, "\t", strings.ToUpper(shout))
        time.Sleep(delay)
        fmt.Fprintln(c, "\t", shout)
        time.Sleep(delay)
        fmt.Fprintln(c, "\t", strings.ToLower(shout))
    }
    
    func handleConn(c net.Conn) {
        input := bufio.NewScanner(c)
        for input.Scan() {
            echo(c, input.Text(), 1*time.Second)
        }
        // NOTE: ignoring potential errors from input.Err()
        c.Close()
    }
    
    // client
    func main() {
        conn, err := net.Dial("tcp", "localhost:8000")
        if err != nil {
            log.Fatal(err)
        }
        defer conn.Close()
        go mustCopy(os.Stdout, conn)
        mustCopy(conn, os.Stdin)
    }
    

    Goroutines和线程

    • 量的区别, 动态栈: os线程固定大小(一般2M), goroutines小(一般2k)且不固定, 动态伸缩,最大1G
    • 调度, os没几毫秒由scheduler内核函数调度, goroutines由go自己大调度器调度,如m:n调度方法在n个线程调度m个goroutines, 且不需要进入内核
    • GOMAXPROCS大变量决定调度到多少个os线程

    channels

    • channels是goroutines之间通信机制, 通过channel从一个goroutine向另一个发送消息
    • chan定义某种类型的channel, 类比C/C++的一个支持多线程的泛型queue
    • 和map类似,channel也对应一个make创建的底层数据结构的引用: make(chan int)
    • 和make,slice等引用类型一样, channel的零值也是nil
    • 发送和接收两个操作都使用<-运算符
    • 支持close操作, 向close后的channel发送会panic, 但仍然可接收chan里的数据, 如没有数据将产生一个零值
    // int型chan, 无缓冲阻塞型, 会阻塞发送者, 直到另一个goroutine接收数据
    ch:=make(chan int)
    ch<-1 //发送
    go func() {
        fmt.Println(<-ch) //接收
    }
    

    无缓冲channel

    • 基于无缓存Channels将导致两个goroutine做同步操作, 因此也叫同步channel
    • 当通过一个无缓存Channels发送数据时,接收者收到数据发生在唤醒发送者goroutine之前
    func main() {
        conn, err := net.Dial("tcp", "localhost:8000")
        if err != nil {
            log.Fatal(err)
        }
        done := make(chan struct{})
        go func() {
            io.Copy(os.Stdout, conn) // NOTE: ignoring errors
            log.Println("done")
            done <- struct{}{} // signal the main goroutine
        }()
        mustCopy(conn, os.Stdin)
        conn.Close()
        <-done // wait for background goroutine to finish
    }
    

    串联的Channels(Pipeline)

    Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个Channel的输入。这种串联的Channels就是所谓的管道(pipeline)

    func main() {
        naturals := make(chan int)
        squares := make(chan int)
    
        // Counter
        go func() {
            for x := 0; x < 100; x++ {
                naturals <- x
            }
            close(naturals)
        }()
    
        // Squarer
        go func() {
            for x := range naturals {
                squares <- x * x
            }
            close(squares)
        }()
    
        // Printer (in main goroutine)
        for x := range squares {
            fmt.Println(x)
        }
    }
    

    单向channels

    <-chan左边是一个只读channel, 在右边则只写,形参传参时可隐式转换:

    //改进版本pipeline示例
    func counter(out chan<- int) {
        for x := 0; x < 100; x++ {
            out <- x
        }
        close(out)
    }
    
    func squarer(out chan<- int, in <-chan int) {
        for v := range in {
            out <- v * v
        }
        close(out)
    }
    
    func printer(in <-chan int) {
        for v := range in {
            fmt.Println(v)
        }
    }
    
    func main() {
        naturals := make(chan int)
        squares := make(chan int)
        go counter(naturals)
        go squarer(squares, naturals)
        printer(squares)
    }
    

    带缓冲channels

    • 像一个可以容纳指定个数元素的队列, 在达到容量前写入不会阻塞, 写入读取操作解耦
    ch = make(chan string, 3)
    

    Multiplexing with select

    select多路复用,可以监听多个chan

    • 当一个或多个case满足条件(如chan可发送或接收消息, 类似于读写事件发生), 会随机选择其中一个分支执行
    • default是所有case都阻塞时执行
    • 每一个case代表一个通信操作(在channel上发送或接收)
    • 无分支的select{}会永远等待
    • 因为对一个nil的channel发送和接收操作会永远阻塞,select不会选择到, 可以用nil来激活或者禁用case
    select {
    case <-ch1:
        // ...
    case x := <-ch2:
        // ...use x...
    case ch3 <- y:
        // ...
    default:
        // ...
    }
    

    示例: 并发遍历目录

    原始版本

    // walkDir recursively walks the file tree rooted at dir
    // and sends the size of each found file on fileSizes.
    func walkDir(dir string, fileSizes chan<- int64) {
        for _, entry := range dirents(dir) {
            if entry.IsDir() {
                subdir := filepath.Join(dir, entry.Name())
                walkDir(subdir, fileSizes)
            } else {
                fileSizes <- entry.Size()
            }
        }
    }
    
    // dirents returns the entries of directory dir.
    func dirents(dir string) []os.FileInfo {
        entries, err := ioutil.ReadDir(dir)
        if err != nil {
            fmt.Fprintf(os.Stderr, "du1: %v\n", err)
            return nil
        }
        return entries
    }
    
    • ioutil.ReadDir()返回os.FileInfo(os.Stat()也返回)类型的slice
    • walkDir()递归调调用
    • walkDir()会向fileSizes这个channel发送一条消息告知文件大小

    main包不限制并发的版本(可能产生大量goroutines)

    • 注意for ... select...惯用方式,未使用range循环,用channel接收的二值形式显式判断channel是否close
    • 不用标签break(break loop)的话只会退出内层select
    package main
    
    import (
        "flag"
        "fmt"
        "io/ioutil"
        "os"
        "path/filepath"
    )
    
    func main() {
        // ...determine roots...
        // Traverse each root of the file tree in parallel.
        fileSizes := make(chan int64)
        var n sync.WaitGroup
        for _, root := range roots {
            n.Add(1)
            go walkDir(root, &n, fileSizes)
        }
        go func() {
            n.Wait()
            close(fileSizes)
        }()
        // ...select loop...
        loop:
        for {
            select {
            case size, ok := <-fileSizes:
                if !ok {
                    break loop // fileSizes was closed
                }
                nfiles++
                nbytes += size
            case <-tick:
                printDiskUsage(nfiles, nbytes)
            }
        }
    }
    
    // walkDir修改为并发
    func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
        defer n.Done()
        for _, entry := range dirents(dir) {
            if entry.IsDir() {
                n.Add(1)
                subdir := filepath.Join(dir, entry.Name())
                go walkDir(subdir, n, fileSizes)
            } else {
                fileSizes <- entry.Size()
            }
        }
    }
    

    通过token方式限制并发

    // sema is a counting semaphore for limiting concurrency in dirents.
    var sema = make(chan struct{}, 20)
    
    // dirents returns the entries of directory dir.
    func dirents(dir string) []os.FileInfo {
        sema <- struct{}{}        // acquire token
        defer func() { <-sema }() // release token
        // ...
    

    并发的退出

    Golang未提供一个goroutine终止另一个goroutine的方法, 一般可以通过close done channel的广播方式退出goroutines

    var done = make(chan struct{})
    
    for {
        select {
        case <-done:
            return 
        case v, ok:=<-workChan:
            if !ok {
                return 
            }
        }
    }
    
    // Cancel traversal when input is detected.
    go func() {
        os.Stdin.Read(make([]byte, 1)) // read a single byte
        close(done) // done
    }()
    

    基于共享变量的并发

    goroutine与channel方式的并发免于处理许多细节, 而基于共享数据的并发就不可避免地需要处理它们, 否则一些竞争条件导致程序产生非预期结果,如错误结果、 死锁(deadlock)、活锁(livelock)和饿死(resource starvation),这就不是并发安全的程序了.

    竞争条件

    竞争条件指程序在多个goroutine交叉执行时没法给出确定预期的结果.数据竞争是一种特殊竞争条件, 产生于多个goroutines并发访问共享数据,且至少有一个写操作, 避免办法:

    • 不写
    • 避免多个goroutines访问, 绑定到某个goroutine, 其他的都用channel, pipeline channel传递到下一个阶段线性访问这种叫串行绑定
    • 互斥访问, 做并发控制(加锁等)

    竞争条件检测: 在go build,go run或者go test命令后加-race

    sync.Mutex互斥锁

    类似于二元信号量:

    var (
        sema    = make(chan struct{}, 1) // a binary semaphore guarding balance
        balance int
    )
    
    func Deposit(amount int) {
        sema <- struct{}{} // acquire token
        balance = balance + amount
        <-sema // release token
    }
    
    func Balance() int {
        sema <- struct{}{} // acquire token
        b := balance
        <-sema // release token
        return b
    }
    

    在Mutex中对应Lock,Unlock:

    import "sync"
    
    var (
        mu      sync.Mutex // guards balance
        balance int
    )
    
    func Deposit(amount int) {
        mu.Lock()
        balance = balance + amount
        mu.Unlock()
    }
    
    func Balance() int {
        mu.Lock()
        // defer mu.Unlock()
        b := balance
        mu.Unlock() 
        return b
    }
    
    • 调用Lock()方法来获取一个互斥锁
    • 如果其它goroutine已经获得了该锁,Lock被阻塞直到其它goroutine调用了Unlock
    • mutex会保护共享变量, 惯例来说,被mutex所保护的变量是在mutex变量声明之后立刻声明
    • Lock和Unlock之间的代码段叫临界区
    • defer来调用Unlock, panic也依然会执行
    • 不是可重入锁(递归锁), 无法对已锁mutex再次上锁, 一个通用的解决方案是将一个函数分离为多个函数(导出的加锁调内部的, 内部的执行实际动作不加锁)
    func Withdraw(amount int) bool {
        mu.Lock()
        defer mu.Unlock()
        deposit(-amount)
        if balance < 0 {
            deposit(amount)
            return false // insufficient funds
        }
        return true
    }
    
    func Deposit(amount int) {
        mu.Lock()
        defer mu.Unlock()
        deposit(amount)
    }
    
    // This function requires that the lock be held.
    func deposit(amount int) { balance += amount }
    

    sync.RWMutex读写锁

    • 允许多个只读操作并行执行, 但写操作完全互斥
    • 或叫“多读单写”锁(multiple readers, single writer lock)
    • 读锁RLock, 写锁还是Lock
    var mu sync.RWMutex
    var balance int
    func Balance() int {
        mu.RLock() // readers lock
        defer mu.RUnlock()
        return balance
    }
    

    内存同步

    写入内存前可能cache, 如不互斥访问, 并不能保证多个goroutines中语句执行顺序

    sync.Once

    初始化延迟执行有加速启动等好处, 但如果只能或只需执行一次的init操作延迟到多个goroutines执行, 那就需要保证只执行一次, 这就是sync.Once的作用

    var loadIconsOnce sync.Once
    var icons map[string]image.Image
    // Concurrency-safe.
    func Icon(name string) image.Image {
        loadIconsOnce.Do(func(){
        // initializing
        })
        return icons[name]
    }
    

    Reference

    相关文章

      网友评论

        本文标题:Golang笔记-- 并发模式

        本文链接:https://www.haomeiwen.com/subject/xdkjsctx.html