美文网首页
Go并发编程

Go并发编程

作者: 宏势 | 来源:发表于2022-04-22 16:31 被阅读0次

Go天生为并发而生,使用go 关键字快捷启动线程(goroutine)轻量级线程也叫协程,一个goroutine大概占2KB内存。

简单例子:

func run(msg string){
    for i:=0; i < 5; i++ {
        fmt.Println(msg,i)
    }
}
func main() {
    run("Direct Run:")
    go run("Thread Run:") //go 关键字,启动goroutine
    run("After Run:")
    time.Sleep(30 *time.Second)
    fmt.Println("done")
}

MPG 模型

MPG是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。


MPG.png
  • M代表一个操作系统内核线程,也可以称为一个工作线程,默认最大10000
  • G协程(goroutine),最终是要放到M上执行的
  • P代表着处理器(processor)维护一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界等等),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了,会从从全局队列拿一些到自己队列上,或者从其它P队列偷一些到自己队列上,或者。P的数量通过runtime.GOMAXPROCS()设置,默认值是cpu核数,go 1.5之前默认值是1,P的队列的G数量不超过256个。

执行步骤

  1. go 关键字创建协程,优先加入某个P的本地队列,满了加入全局队列
  2. P 需要持有或者绑定一个M,而M会启动一个线程,不断从P的本地队列获取G执行
  3. 当P的本地队列执行完成以后,会从全局队列取G,如果全局队列没有G,再从其它P队列窃取G到本地队列
  4. 重复执行以上步骤,直至所有G执行完毕

调度策略

复用线程:避免频繁的创建、销毁线程,而是对线程的复用

  • Work-Stealing (任务窃取)
    当M无可运行的G时,尝试从全局队列获取,没有从网络轮询器获取,再没有从其它处理器的本地队列窃取
  • Hand Off 机制,当M因为G进行系统调用阻塞时,M会与P解绑,同时带走G,把P转移给其它空闲的线程执行

Go语言相比起其他语言的优势在于goroutine则是由Go运行时(runtime)调度器自己调度的,而其它语言是OS调度。特点是goroutine的调度是在用户态下完成的,不涉及内核态与用户态之间的频繁切换,调度成本比调度OS线程低很多, 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上(调度m个goroutine到n个OS线程),再加上本身goroutine的超轻量(占内存2KB),有效保证了Go的并发性能。

并发模型

Go 支持两种并发模型:CSP模型共享内存模型

  • CSP模型:communicating sequential processes 通信顺序进程
    核心概率:奉行通过通道来共享内存,而不是共享内存来通信
  • 共享内存模型:通过共享内存来+锁机制实现

通道Chan

通道(channel)是用来传递数据的一种特殊类型,常用于两个协程通讯和同步运行(自带锁机制)。通道就像一个传送带或者队列,遵循先入先出,保证收发数据的顺序,声明的时候要指定数据类型。
通道可以分为无缓冲通道和缓冲通道
通道根据读写分为:"chan" 可读可写, "chan<-" 仅可写, "<-chan" 仅可读

简单来说,通道是一个线程安全共享池

无缓冲通道

采用 make(chan 元素类型) 创建无缓冲通道,使用无缓冲通道进行通信将导致发送和接收的goroutine同步化,所以也叫同步通道

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // 把 sum 发送到通道 c
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int) //int 类型 通道
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x := <-c
    y := <-c // 从通道 c 中接收
    fmt.Println(x, y, x+y)
}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

缓冲通道

采用make(chan 元素类型, 缓冲大小) 创建缓冲通道

利用缓冲通道实现生产者消费者例子

type apple struct{
    name string
}
func produce(bufChan chan apple){
    index := 0
    for {
        time.Sleep(100*time.Millisecond)
        index ++
        bufChan <- apple{"apple" + strconv.Itoa(index)}
        fmt.Println("produce apple", index)
    }
}
func consume(bufchan chan apple){
    for {
        time.Sleep(120*time.Millisecond)
        result := <- bufchan
        fmt.Println("consume ", result)
    }
}
func main() {
    bufList :=  make(chan apple, 10) //容量为10,cap()
    go produce(bufList)
    go consume(bufList)
    select {
    }
}

1.缓冲通道读写不需要在两个不同协程里面
2.可以只有写入操作;但是不能只有读操作,运行时会报deadlock错误
3.当通道满了以后,写入会阻塞,当通道为空,读取堵塞
4.当通道不使用时,要记得关闭通道

关闭通道

func main() {

    ch1 := make(chan int, 5)
    ch2 := make(chan int, 5)
    go func() {
        for i := 0; i < 20; i++ {
            ch1 <- i
        }
        close(ch1) //使用完关闭通道,通知读取协程
    }()
    go func() {
        for {
            if result,ok :=<-ch1; ok{
                ch2 <- result* result
            }else{  //ch1通道关闭,取完值 ok=false
                break
            }
        }
        close(ch2)
    }()
    for result := range ch2{  //ch2 通道关闭,取完值range循环自动退出
        fmt.Println(result)
    }
}

1.对一个关闭的通道进行接收会一直获取值直到通道为空
2.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
3.对一个关闭的通道再发送值就会导致panic
4.关闭一个已经关闭的通道会导致panic

判断通道关闭,通常采用for range 通道的方式

select

select 是go的一个控制结构,类似switch语句,只是每个case都必须是一个通讯操作(chan读写)

超时控制例子

func main() {

    chan1 := make(chan int)
    quit := make(chan bool)
    go func() {
        for {
            select {
            case <- time.After(5*time.Second):
                fmt.Println("timeout...")
                quit <- true
            case result,_:= <-chan1:
                fmt.Println(result)
            }
        }
    }()
    for i:= 0; i < 5; i++ {
        chan1 <- i
        time.Sleep(time.Second)
    }
    <- quit
    fmt.Println("done...")

}

定时任务例子:

func main() {
    tickTimer := time.NewTicker(1 * time.Second)
    tickTimer1 := time.NewTicker(20 * time.Second)
    startTime:= time.Now().Add(60*time.Second)
    for {
        select {
        case <-tickTimer.C:
            fmt.Println("tickTime:",time.Now())
        case <-tickTimer1.C:
            fmt.Println("tickTime1:", time.Now())
        }
        if time.Now().After(startTime) {
            fmt.Println("exit:",time.Now())
            break
        }
    }
}

1.select里面case操作必须是通讯IO操作(channel的收发)
2.select中如果没有default语句,会阻塞等待任意一个case,满足则执行
3.select中如果没有default语句,多个case同时满足条件,随机选择一个case执行(case执行顺序是完全随机)
4.select中default语句,没有满足的case条件时执行
5.当select没有任何语句的时候,会一直阻塞

并发同步

sync 是go语言并发同步的包

sync.WaitGroup

可以使用sync.WaitGroup来实现并发任务的同步,而不是使用time.Sleep

func DoSomething(wg * sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("do something...")
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go DoSomething(&wg)
    go DoSomething(&wg)
    wg.Wait()
    fmt.Println("done...")
}

sync.WaitGroup 是结构体,参数传递要传递结构体指针

sync.Once

确保某些操作高并发的场景下执行一次

func Init(){
    fmt.Println("init")
}
func main() {
    var once sync.Once
    go once.Do(Init) 
    go once.Do(Init)
    time.Sleep(5*time.Second)
}

sync.Once是结构体,参数传递要传递结构体指针

互斥锁

var num = 0
var lock sync.Mutex
var wg sync.WaitGroup
func increment(){
    lock.Lock()
    num = num + 1
    lock.Unlock()
    wg.Done()
}
func main() {
    wg.Add(300)
    for i:=0; i < 300; i++ {
        go increment()
    }
    wg.Wait()
    fmt.Println(num)
}

读写锁

var (
    num = 0
    rw sync.RWMutex
    wg sync.WaitGroup
)
func Write(){
    rw.Lock()
    num = num+1
    time.Sleep(10*time.Millisecond)
    rw.Unlock()
    wg.Done()
}
func Read(){
    rw.RLock()
    time.Sleep(time.Millisecond)
    rw.RUnlock()
    wg.Done()
}

func main() {
    for i:=0;i < 100; i++ {
        wg.Add(1)
        go Write()
    }
    for i:=0;i < 300; i++ {
        wg.Add(1)
        go Read()
    }
    wg.Wait()
    fmt.Println("result:",num)

}

原子操作

sync/atomic包提供,主要如下操作:

方法 说明
func LoadInt32(addr *int32) (val int32) 读取操作
func StoreInt32(addr *int32, val int32) 写入操作
func AddInt32(addr *int32, delta int32) (new int32) 修改操作
func SwapInt32(addr *int32, new int32) (old int32) 交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) 比较并交换操作

sync.Once 源码如下:

type Once struct {
    done uint32
    m    Mutex
}
func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        // Outlined slow-path to allow inlining of the fast-path.
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

相关文章

  • Go基础语法(九)

    Go语言并发 Go 是并发式语言,而不是并行式语言。 并发是指立即处理多个任务的能力。 Go 编程语言原生支持并发...

  • Go并发

    Go语言中的并发编程 并发是编程里面一个非常重要的概念,Go语言在语言层面天生支持并发,这也是Go语言流行的一个很...

  • 瞅一眼就会使用GO的并发编程分享

    [TOC] GO的并发编程分享 之前我们分享了网络编程,今天我们来看看GO的并发编程分享,我们先来看看他是个啥 啥...

  • day08-go.GPM

    当别人到go为什么支持高并发,或者问为什么go本身对并发编程友好?以及go与Java对比的并发对比 正确回答: 在...

  • Go语言简介

    Go语言简介 Go语言设计的初衷 针对其他语言的痛点进行设计并加入并发编程为大数据,微服务,并发而生的通用编程语言...

  • 13 Go并发编程初探

    一、Go并发编程概述 Go以并发性能强大著称,在在语言级别就原生支持,号称能实现百万级并发,并以此独步江湖,本专题...

  • Go 基础

    基础 [TOC] 特性 Go 并发编程采用CSP模型不需要锁,不需要callback并发编程 vs 并行计算 安装...

  • go 并发编程

    在资源有限的情况下,如何最大化的利用有限的资源就是并发,提高并发 goroutine runtime包 chann...

  • go并发编程

    最近挖了个坑开始学习go语言,打断把其中遇到的坑都记录下来 go学习的过程中最为惊叹的就是并发编程了,我可以少掉好...

  • Go并发编程

    常见并发模型 进程 & 线程(Apache)C10K 异步非阻塞(Nginx,Libevent,NodeJS) 复...

网友评论

      本文标题:Go并发编程

      本文链接:https://www.haomeiwen.com/subject/ldrbjrtx.html