第4章 并发编程
4.1 并发基础
进程在运行时,都有自己的调用栈和堆,有完整的上下文。操作系统在调试进程的时候,会保存高度进程的上下文环境,等该进程获得时间片后,再恢复该进程的上下文一系统中。
并发应用场景:
- 灵敏的界面响应和大量的运算或者IO同时执行。
- web服务器面对大量的用户请求。
- 在分布式环境上,相同的工作单元在不同计算机上处理着分片的数据。
- IO操作被阻塞,整个程序处于停滞状态,其它IO无关的任务就无法执行。
并发实现模型:
- 操作系统层面的并发基本模式,开销很大。简单,进程间互不影响,独立内存,由内核管理。
- 多线程。
- 基于回调的非阻塞/异步IO。基于线程的客户端连接,会很快地耗尽服务器资源。这种模式通过基于事件的方式使用异步IO,尽可能少地使用线程降低开销。缺点是编程有分割,对反映问题本身不自然。
- 协程本质上是一种用户态线程,不需要操作系统进行抢占式调度。寄存在线程中,开销极小。编程简单,结构清晰。
线程间通信采用共享内存的方式,为保证内存共享的有效性,采用加锁等避免死锁或资源竞争。
4.2 协程
执行体:进程、线程、协程(轻量级线程)。线程和进程通常最多不超过1万,协程可创建数百万个。多数语言不支持协程,通过库支持也仅支持协程的创建、销毁、切换等,而如调用同步IO,如网络IO、文件IO,都会阻塞其它的协程。
go语言协程的切换管理不信赖系统的线程和进程。
4.3 goroutine
goroutine是go语言轻量级线程的实现,由Go运行时管理。
在函数前加go关键字,这个函数就会在一个新的goroutine中并发执行。当函数返回,goroutine结束。如果函数有返回值,会被丢弃。
需要解决的一个问题
主函数并不等待其它goroutine运行完就退出了。
4.4 并发通信(锁)
并发单元间的通信模型:共享数据和消息。
//使用共享变量防止主线程提前结束的代码
var count int = 0//共享变量(注意加锁的共享变量不能用类型扮演)
func Count(lock *sync.Mutex) {
lock.Lock()
count++
fmt.Print(count)
lock.Unlock()//对方法加锁
}
func main() {
lock := &sync.Mutex{}
for i :=0;i<10;i++{
go Count(lock)
}
for{
lock.Lock()
c := count
lock.Unlock()
runtime.Gosched()//类似于Thread.yields(),主线程阻塞等待,所以要让出时间片。
if c >=10 {//主线程结束标志
break
}
}
}
对函数加锁,保护数据。不断检查count值(加锁),达到10时所有线程执行完毕。
另一种消息通信机制channel,认为每个并发单元是自包含的独立的个体,不同的并发单元间这些变量不共享。下节详解。
4.5 channel
channel是进程内通信。如需进程间通信,建议使用分布式系统,如socket,http解决问题。
channel是类型相关的。一个channel只能传递一种类型的值。(socket的通道也只支持一种buffer)
func Count(ch chan int) {
ch <- 1 //向通道写入一个数据,在通道中的数据被读取之前,数据是阻塞的。
fmt.Println("Counting")
}
func main() {
chs := make([] chan int,10)//初始化通道数组。连写两个类型相当于泛型,用空格分开。
for i:=0;i<10;i++{
chs[i] = make(chan int)//通道数组的每个元素仍要初始化。
go Count(chs[i])
}
for _,ch := range (chs){//向通道要数据,相当于完成了Thread.yields()的功能
<- ch
}
}
//为需要共享数据的线程,建立一个通道数组。
在所有的goruntime启动完成之后,通过<-ch语句中读取数据。在对应的channel被写入了数据前也是阻塞的。这样channel实现了类似锁的功能,进而保证了所有goroutine完成之后主函数才返回。
基本语法
//声明:
var chanName chan ElementType
//也可以声明一个map
var chanMap map[string] chan int
//make初始化
ch := make(chan int)
//通道写入数据
chan <- value
//读取通道中的数据
value <- chan//如果channel中没有数据,那么将阻塞直到被写入数据为止。
select
unix的select()方法监控一切文件句柄,一旦有文件句柄发生IO操作,select()返回。用于处理异步IO问题。
go在语言层面支持select,用法如switch。使用要求:每个case语句必须是IO操作。selecct后面不用带判断条件,直接查看case语句。
select{
case <- chan1
//如果成功从chan1读取到数据,则。。。
case chan2 <- 1
//如果成功向chan2写入数据,则。。。
default:
}
例子:
这个例子证明了select的匹配的随机的,并非顺序。
ch := make(chan int,1)//在go1.9中,make方法创建的变量 不用类型扮演。
for{
select{
case ch <- 1:
case ch <- 0:
}
i := <- ch
fmt.Print(i)
}
//功能是无限地向ch写入随机的0或1。
缓冲机制
通道带缓冲很容易,只需要创建通道时指定缓冲区大小即可。
ch := make(chan byte,1024)
//用range关键字循还读取
for i:= range ch{
fmt.Print(i)
}
这样即使没有读取方,在缓冲区没有被写完之前也不会阻塞。
超时机制
并发通信中的超时问题:向channel写入数据时发现channel已满,或者从channel试图读取数据时发现channel为空,可能导致整个goroutine锁死。
另一种情况,永远没人往ch里写数据,也将永远无法从ch中读数据,也会导致整个goruntine永远阻塞。
go语言没有提供超时处理机制(问题),但我们可以利用select机制。
select的特点是只要一个case已经完成,程序就会继续执行,而不考虑其它case的情况。
timeout := make(chan bool,1)
go func(){
time.Sleep(1e9)//等待1秒
timeout <- true
}()//匿名函数直接调有用所以加了个括号。
select{
case ch <-:
case <- timeout:
//如果ch一直没有写入数据,等了1秒之后timeout通道的数据将写入成功,程序不会因此而阻塞
}
以上就是防止channel阻塞导致的goroutine锁死的解决办法。
channel传递
定义:channel本身在定义后也可以通过channel来传递。
利用channel的传递性实现管道:
type PipeData Struct{
value int
handler func(int) int
next chan int
}
func handle(queue chan *PipeData){
for data := range queue{
data.next <- data.handler(data.value)
}
}
//只需要定义一系统的PipeData的数据结构并一起传递给这个函数,就可以达到流式处理数据的目的。
而对比于java、c++实现这些就需要设计一系列的接口。
管道的概念扩充??
单向channel
声明为单向channel,从而限制channel。应用,如保护数据等。
var ch1 chan int
var ch2 chan <- float64//只写channel
var ch <-chan int//只读channel
若单向channel只能读或写,必然是没有意义的。单向channel只不过是对使用者的安全限制。如对于只读channel,先强转后写入。
ch4 := make(chan int)
ch5 := <-chan int(ch4)//单向读ch
ch6 := chan <- int(ch4)//单向写
作用是避免ch出现非期望数据,更好地实现最小权限原则。(个人觉得意义不大,通道相对地流式读写,优势就是双向读写)
关闭channel
channel关闭使用内置函数关闭
close(ch)
4.6 多核并行化
//并行计算N个整数的总和
type Vector []float64
func (v Vector) DoSome(i,n,int,u Vector,c chan int) {
for ; i<n ;i++{
v[i] += u.Op(v[i])
}
c <- 1 //发送信号告诉任务管理者告知已经完成计算
}
const NCPU = 16 //cpu核数,字面值常量不用推演
func (v Vector) DoAll(u Vector) {
ch := make(chan int,NCPU)
for i:=0;i<NCPU;i++{
go v.DoSome(i*len(v)/NCPU,(i+1)*len(v)/NCPU,u,ch)
}
//t等待所有线程执行完成
for i:=0;i<NCPU;i++{
<- ch //获取到数据,表示函数执行完成。
}
}
4.7 让出时间片
在每个goroutine中控制何时==主动==让时间片给其它goroutine,可以ete runtime包中的Gosched()函数实现。
其它精细的协程控制方法详见runtime包。
4.8 同步
sync包提供两种锁sync.Mutex()和sync.RWMutex()。
goroutine获取Mutex锁时,其它线程只能等待锁释放。
RWMutex是单写多读锁,在读锁占用的情况下,阻止写不阻止读。即,多个goroutine(调用RLock()方法)可同时获得读锁。而写锁(调用Lock()方法)会阻止读和写的goroutine进来。RWMutex实现上组合了Mutex。
锁使用上要注意Lock()和Unlock()方法成对出现,否则容易导致goroutine处于饥饿状态,甚至死锁。
var lock sync.Mutex;
func foo(){
lock.Lock()
defer lock.Unlock()//使用defer关键字更优雅地调用Unlock()
//...
}
全局性唯一性操作
实现在全局的范围内只需要运行代码一次,如全局化操作。函数sync.Once.Do()作用类似于volatile。
var a string
var once sync.Once
func Setup(){
a = "hello"
}
func DoPrint(){
once.Do(Setup())
fmt.Print(a)
}
func main(){
go DoPrint()
go DoPrint()
}
若出现重复调用,将会被阻塞,直到全局唯一的once.Do()调用结束。
java中的变通方法是,添加一个全局的 volatile bool,在函数setup()最后一行将bool变量设置为true。在setup()的所有调用之前,判断bool是否已经标记为true,不是则再调用一次setup()。可能出现的问题是,setup()并非原子性操作(还要加锁),可能导致setup()被调有多次,仍无法达到目的。
var done bool = false
func setup(){
a = "hello"
done = true
}
func doprint(){
if !done{
setup()
}
print(a)
}
其它的go提供sync.atomic包提供 基础数据类型的原子操作:
func CompareAndSwapUnit64(val *unit64,old,new unit64)(swapped bool)
网友评论