美文网首页
GO学习笔记(18) - 并发编程(4) - 锁与传统同步机制

GO学习笔记(18) - 并发编程(4) - 锁与传统同步机制

作者: 卡门001 | 来源:发表于2021-07-10 18:09 被阅读0次

目录

  • 传统同步机制
    • Mutex
  • 并发模式-服务任务模式
    • 循环模式
    • select模式

传统同步机制

在GO语言中,不建议用传统的同步机制,但不等于不提供。而且在某一些场景下有其用途。GO提供的传统的同步机制提供有 WaitGroup、Mutex、Cond等,本文重点解决Mutex方式,其他原理类似。为了更加的利于理解Mutex,本文将提供两段代码,来说明用加锁的意义。

Mutext的使用

  • 无锁的情况

编译前可用go run -race atomic.go会看到读写存在冲突

package main
import (
    "fmt"
    "time"
)

type atomicInt int

func (a *atomicInt) increment()  {
    * a++
}

func (a *atomicInt) get()  int  {
    return int(*a)
}
func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

  • 加锁后
    再运行go run -race atomic.go可以看出冲突不再了
package main

import (
    "fmt"
    "sync"
    "time"
)


type atomicInt struct {
    value int
    lock sync.Mutex
}

func (a *atomicInt) increment()  {
    fmt.Println("safe increment")
    go func() {
        a.lock.Lock()
        //保证defer在func这段函数体内
        defer a.lock.Unlock() 
        a.value++
    }()
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return int(a.value)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

-服务任务模式:等待多个服务

服务/任务模式

任务/服务模式

  • 方式一:循环模式
package main

import (
    "fmt"
    "math/rand"
    "time"
)

//发送
func msgGen(name string) chan string{
    c := make(chan string)
    //送数据
    go func() {
        i := 0
        //Sprintf并不会打印到终端
        for {
            time.Sleep(time.Duration(rand.Intn(2000))*time.Millisecond)
            c <- fmt.Sprintf("service:%s, message %d \n", name,i)
            i++
        }
    }()
    return c
}

//适用于chan未知个数的情况
func fanInt(chs ...chan string) chan string {
    c := make(chan string)
    for _, ch := range chs{
        go func(in chan string) { //每一个ch开一个goroutoine
            for {
                //因为每次main"fmt.Println( <- m)”时,才从c <- <-ch读数据,
                //所以为了保证读的当时数据的稳定,在进入并发函数时,
                //传入in参数或在外定义chopy(chopy:=ch),故一个备份,保证数据的稳定
                c <- <-in
            }
        }(ch) //表示:将ch 复制到 in,并传入函数体内
    }
    return c
}


func main() {
    m1 := msgGen("service1")
    m2 := msgGen("service2")
    m3 := msgGen("service3")
    m := fanInt(m1,m2,m3)
    for{
        fmt.Println( <- m)
    }
}

  • 方式二:select模式
package main

import (
    "fmt"
    "math/rand"
    "time"
)

//发送
func msgGen(name string) chan string{
    c := make(chan string)
    //送数据
    go func() {
        i := 0
        //Sprintf并不会打印到终端
        for {
            time.Sleep(time.Duration(rand.Intn(2000))*time.Millisecond)
            c <- fmt.Sprintf("service:%s, message %d \n", name,i)
            i++
        }
    }()
    return c
}


//接收,利用select能力,只需要一个 goroutine
func finInBySelect(c1, c2 chan string) chan string {
    c := make(chan string)
    go func() {
        for{
            select{
            case m := <-c1:
                c <- m
            case m := <-c2:
                c<-m
            }
        }
    }()
    return c
}

func main() {
    m1 := msgGen("service1")
    m2 := msgGen("service2")
    m := finInBySelect(m1,m2)
    for{
        fmt.Println( <- m)
    }
}

任务控制

下述代码是在循环模式中增加了任务控制函数

    1. 非阻塞等待
    1. 超时机制
    1. 任务中断或优雅退出
package main

import (
    "fmt"
    "math/rand"
    "time"
)

//发送
func msgGen(name string,done chan struct{}) chan string{
    c := make(chan string)
    //送数据
    go func() {
        i := 0
        //Sprintf并不会打印到终端
        for {
            select{
            case <- time.After(time.Duration(rand.Intn(5000))*time.Millisecond):
                c <- fmt.Sprintf("service:%s, message %d \n", name,i)
            case <-done:
                fmt.Println("clean up")
                time.Sleep(time.Duration(rand.Intn(2000))*time.Millisecond)
                fmt.Println("clean done")
                done <- struct{}{}
                return //主动退出
            }
            i++
        }
    }()
    return c
}

//适用于chan未知个数的情况
func fanInt(chs ...chan string) chan string {
    c := make(chan string)
    for _, ch := range chs{
        go func(in chan string) { //每一个ch开一个goroutoine
            for {
                //因为每次main"fmt.Println( <- m)”时,才从c <- <-ch读数据,
                //所以为了保证读的当时数据的稳定,在进入并发函数时,传入in参数或在外定义chopy(chopy:=ch),故一个备份,保证数据的稳定
                c <- <-in
            }
        }(ch) //表示:将ch 复制到 in,并传入函数体内
    }
    return c
}


/**
1. 非阻塞等待
2. 超时机制
3. 任务中断或退出
4. 优雅退出
*/
type ret struct {
    val string
    ok bool
}
//非阻塞等待
func nonBlockingWait(c chan string) ret {
    select {
    case m := <-c:
        return ret{
            val: m,
            ok:true,
        }
    default:
        return ret{
            val: "",
            ok: false,
        }
    }
}

//超时机制


func timeoutWait(c chan string,timeout time.Duration)  ret{
    select {
    case m := <-c:
        return ret{
            val: m,
            ok:true,
        }
    case <- time.After(timeout):
        return ret{
            val: "",
            ok: false,
        }
    }
}
//优雅退出


func main() {
    done := make(chan struct{})
    m1 := msgGen("service1",done)
    m2 := msgGen("service2",done)
    m := fanInt(m1,m2)
    for{
        fmt.Println( <- m)
        //超时等待
        if ret := timeoutWait(m,2); ret.ok {
        //if ret := nonBlockingWait(m); ret.ok {
            fmt.Println(m)
        }else{
            fmt.Println("no m")
        }
    }

    done <- struct{}{}
    <- done
}

相关文章

网友评论

      本文标题:GO学习笔记(18) - 并发编程(4) - 锁与传统同步机制

      本文链接:https://www.haomeiwen.com/subject/wicwultx.html