目录
- 传统同步机制
- Mutex
- 并发模式-服务任务模式
- 循环模式
- select模式
传统同步机制
在GO语言中,不建议用传统的同步机制,但不等于不提供。而且在某一些场景下有其用途。GO提供的传统的同步机制提供有 WaitGroup、Mutex、Cond等,本文重点解决Mutex方式,其他原理类似。为了更加的利于理解Mutex,本文将提供两段代码,来说明用加锁的意义。
Mutext的使用
- 无锁的情况
编译前可用
go run -race atomic.go
会看到读写存在冲突
package main
import (
"fmt"
"time"
)
type atomicInt int
func (a *atomicInt) increment() {
* a++
}
func (a *atomicInt) get() int {
return int(*a)
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get())
}
- 加锁后
再运行go run -race atomic.go
可以看出冲突不再了
package main
import (
"fmt"
"sync"
"time"
)
type atomicInt struct {
value int
lock sync.Mutex
}
func (a *atomicInt) increment() {
fmt.Println("safe increment")
go func() {
a.lock.Lock()
//保证defer在func这段函数体内
defer a.lock.Unlock()
a.value++
}()
}
func (a *atomicInt) get() int {
a.lock.Lock()
defer a.lock.Unlock()
return int(a.value)
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get())
}
-服务任务模式:等待多个服务

任务/服务模式
- 方式一:循环模式
package main
import (
"fmt"
"math/rand"
"time"
)
//发送
func msgGen(name string) chan string{
c := make(chan string)
//送数据
go func() {
i := 0
//Sprintf并不会打印到终端
for {
time.Sleep(time.Duration(rand.Intn(2000))*time.Millisecond)
c <- fmt.Sprintf("service:%s, message %d \n", name,i)
i++
}
}()
return c
}
//适用于chan未知个数的情况
func fanInt(chs ...chan string) chan string {
c := make(chan string)
for _, ch := range chs{
go func(in chan string) { //每一个ch开一个goroutoine
for {
//因为每次main"fmt.Println( <- m)”时,才从c <- <-ch读数据,
//所以为了保证读的当时数据的稳定,在进入并发函数时,
//传入in参数或在外定义chopy(chopy:=ch),故一个备份,保证数据的稳定
c <- <-in
}
}(ch) //表示:将ch 复制到 in,并传入函数体内
}
return c
}
func main() {
m1 := msgGen("service1")
m2 := msgGen("service2")
m3 := msgGen("service3")
m := fanInt(m1,m2,m3)
for{
fmt.Println( <- m)
}
}
- 方式二:select模式
package main
import (
"fmt"
"math/rand"
"time"
)
//发送
func msgGen(name string) chan string{
c := make(chan string)
//送数据
go func() {
i := 0
//Sprintf并不会打印到终端
for {
time.Sleep(time.Duration(rand.Intn(2000))*time.Millisecond)
c <- fmt.Sprintf("service:%s, message %d \n", name,i)
i++
}
}()
return c
}
//接收,利用select能力,只需要一个 goroutine
func finInBySelect(c1, c2 chan string) chan string {
c := make(chan string)
go func() {
for{
select{
case m := <-c1:
c <- m
case m := <-c2:
c<-m
}
}
}()
return c
}
func main() {
m1 := msgGen("service1")
m2 := msgGen("service2")
m := finInBySelect(m1,m2)
for{
fmt.Println( <- m)
}
}
任务控制
下述代码是在循环模式中增加了任务控制函数
- 非阻塞等待
- 超时机制
- 任务中断或优雅退出
package main
import (
"fmt"
"math/rand"
"time"
)
//发送
func msgGen(name string,done chan struct{}) chan string{
c := make(chan string)
//送数据
go func() {
i := 0
//Sprintf并不会打印到终端
for {
select{
case <- time.After(time.Duration(rand.Intn(5000))*time.Millisecond):
c <- fmt.Sprintf("service:%s, message %d \n", name,i)
case <-done:
fmt.Println("clean up")
time.Sleep(time.Duration(rand.Intn(2000))*time.Millisecond)
fmt.Println("clean done")
done <- struct{}{}
return //主动退出
}
i++
}
}()
return c
}
//适用于chan未知个数的情况
func fanInt(chs ...chan string) chan string {
c := make(chan string)
for _, ch := range chs{
go func(in chan string) { //每一个ch开一个goroutoine
for {
//因为每次main"fmt.Println( <- m)”时,才从c <- <-ch读数据,
//所以为了保证读的当时数据的稳定,在进入并发函数时,传入in参数或在外定义chopy(chopy:=ch),故一个备份,保证数据的稳定
c <- <-in
}
}(ch) //表示:将ch 复制到 in,并传入函数体内
}
return c
}
/**
1. 非阻塞等待
2. 超时机制
3. 任务中断或退出
4. 优雅退出
*/
type ret struct {
val string
ok bool
}
//非阻塞等待
func nonBlockingWait(c chan string) ret {
select {
case m := <-c:
return ret{
val: m,
ok:true,
}
default:
return ret{
val: "",
ok: false,
}
}
}
//超时机制
func timeoutWait(c chan string,timeout time.Duration) ret{
select {
case m := <-c:
return ret{
val: m,
ok:true,
}
case <- time.After(timeout):
return ret{
val: "",
ok: false,
}
}
}
//优雅退出
func main() {
done := make(chan struct{})
m1 := msgGen("service1",done)
m2 := msgGen("service2",done)
m := fanInt(m1,m2)
for{
fmt.Println( <- m)
//超时等待
if ret := timeoutWait(m,2); ret.ok {
//if ret := nonBlockingWait(m); ret.ok {
fmt.Println(m)
}else{
fmt.Println("no m")
}
}
done <- struct{}{}
<- done
}
网友评论