进阶-2

作者: hellomyshadow | 来源:发表于2019-09-26 06:54 被阅读0次

    Select

    1. select 可见监听 Channel 上的数据流动;
    2. select 结构与 switch 的结构类似,但select有比较多的限制,其中最大的一条限制就是每个 case 语句里必须是一个IO操作!
      select {
          case <-ch1:
              //ch1 成功读到数据,则执行该case
          case ch2 <- 1:
              //成功向 ch2 中写入数据,则执行该case
          default:
              //默认执行项
      }
      
      1. 在一个select语句中,Go会按顺序从头至尾评估每一个发送和接收的语句;
      2. 如果其中的任意一条语句可以继续执行,即没有被阻塞,那么就从这些语句中任意选择一条来使用;
      3. 如果没有一条语句可以执行,即所有的通道都被阻塞,那么有两种可能:
        1. 如果提供了 default 语句,那么就会执行,同时程序的执行会从select语句后的语句中恢复;
        2. 如果没有 default 语句,那么 select 语句将被阻塞,直到至少有一个通信可以进行下去。
      4. 一旦每次轮询都进入了 default 语句,CPU就会进入 忙轮询 状态,所以通常不会使用 default 语句,而是选择阻塞,出让CPU
    3. 基本使用
      1. select语句本身不带循环机制,还是需要借助for
      2. break语句可以跳出select语句,所以不能试图在select语句中使用break跳出for循环;
          func main() {
              ch := make(chan int)   // 存放数据的通道
              quit := make(chan bool)  // 判断是否退出的通道
              go func() {
                  for {
                      select {
                          case num := <-ch:  // 监听 ch 通道中的数据流动,有则读取出来,并进入分支
                              fmt.Print(num, " ")
                          case <-quit:  // 监听 quit 通道中的数据流动,有则进入分支
                              runtime.Goexit()  // 终止GO程,还可以使用 return 结束 for循环
                      }
                  }
              }()
              x,y := 1,1
              for i:=0; i<20; i++ {  //计算斐波那契数列
                  ch <- x  // 向 ch 通道中写数据
                  x,y = y, x+y
              }
              quit <- true  // 向 quit 通道中写数据
          }
      
      1. case 语句在判断通道上是否有数据时,不仅仅是在判断,如果有数据,它会直接从通道中取出数据,所以不能在分支内获取数据,否则数据会丢失。
          case <-ch:  // 本地数据被丢弃
              num := <-ch  // 此时获取的是下一次的数据
      
    4. 超时机制
      1. 为了避免整个程序进入阻塞情况,可以利用 select 来设置超时;
      2. time.After()设置超时
      go func() {
          for {
              select {
                  case num := <-ch:
                      fmt.Print(num)
                  case <-time.After(5*time.Second):
                      fmt.Print("timeout")
              }
      }()
      
      1. 如果 ch 通道中有数据,则进入num := <-ch分支,那么time.After()的时间会被重置;
      2. 如果num := <-ch也是阻塞的,那么 select 则会陷入阻塞,循环暂时终止;如果阻塞时间达到定时时间5s,则进入<-time.After(5*time.Second)分支;
          func main() {
              ch := make(chan int)
              quit := make(chan bool)
              go func() {
                  for {
                      select {
                          case num := <-ch:
                              fmt.Print(num)
                          case <-time.After(5*time.Second):
                              fmt.Print("timeout")
                              quit <- true  // 超时时间到,向 quit 通道中写入数据,结束程序
                      }
                  }
              }()
              //ch <- 89  // 不向 ch 通道中写入数据,模拟超时
              <- quit  // 主Go程从 quit 通道中读取到数据时,主Go程结束,那么子Go程也会随之结束
          }
      
      1. 由此可见,虽然 ch 通道只有读端,没有写端,但select语句并不会让当前Go程发生死锁,如果读端在select语句之外,则会发生死锁。

    Go语言把锁集成到了Channel中,Channel具备了锁机制。

    死锁

    死锁并不是一种锁,而是使用锁导致的一种现象;

    1. 单Go程自己死锁:Channel应至少有 2 个以上的Go程中进行通信,否则死锁!
      func main() {
          ch := make(chan int)
          ch <- 89
          num := <-ch
      }
      // 写端阻塞,读端不会执行,造成死锁异常
      
    2. Go程间的Channel访问顺序导致死锁
      func main() {
          ch := make(chan int)
          num := <-ch
          go func() {
              ch <- 89
          }
      }
      // 子Go程还没来得及执行,主Go程已经阻塞了,造成死锁异常
      
    3. 多Go程,多Channel交叉死锁
      func main() {
          ch1 := make(chan int)
          ch2 := make(chan int)
          go func() {
              for {
                  select {
                      case num := <-ch1:  //读取通道 ch1 中的数据,写入通道 ch2
                          ch2 <- num
                  }
              }
          }()
          for {
              select {
                  case num := <-ch2:
                      ch1 <- num
              }
          }
      }
      

    互斥锁

    1. 每个资源都对应于一个称为 互斥锁 的标记,这个标记用来保证在任意时刻,只能有一个协程/线程访问该资源;
    2. 互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库 sync 中的 Mutex 结构体类型表示;
    3. sync.Mutex 类型只有两个公开的指针方法,Lock(锁定当前资源)Unlock(解锁)
    4. 在使用互斥锁时,对资源操作完成后,一定要解锁,否则会出现流程执行异常、死锁等问题,通常在锁定后,立即使用defer语句解锁;
      var mutex sync.Mutex   //创建一把互斥锁
      mutex.Lock()   //上锁
      mutex.Unlock()  //解锁
      

    读写锁

    1. 互斥锁的本质是,当一个Goroutine访问时,其他Goroutine都不能访问;这样在资源同步、避免竞争的同时,也降低了程序的并发能力,程序由原来的并行执行变成了串行执行;
    2. 当对一个不会变化资源只做操作时,是不存在资源竞争的,多少Goroutine同时读取都没有问题;所以,竞争的问题在于写数据,只有在之间才存在数据同步问题,之间不存在互斥操作的必要;
    3. 之间使用互斥锁显得很浪费资源,这也就衍生出另一种锁:读写锁,一把具有读属性和写属性的锁;
    4. 读写锁可以让多个读操作并发,即同时读取,但对于写操作却是完全互斥的,也就是说当一个Goroutine进行写操作时,其他Goroutine既不能读,也不能写;
    5. Go中的读写锁由结构体类型sync.RWMutex表示,包含两组方法:
      1. 一组是对写操作的锁定和解锁,简称写锁定写解锁
          func (*RWMutex)Lock()
          func (*RWMutex)Unlock()
      
      1. 另一组是对读操作的锁定和解锁,简称读锁定读解锁
          func (*RWMutex)RLock()
          func (*RWMutex)RUnlock()
      
    6. Channel本身就已经集成了锁,所以尽量不要把互斥锁、读写锁与Channel混用,否则可能造成隐形死锁!
    7. 在多Go程通信时,为了更好地实现数据同步,不会使用Channel,还是会选择使用效率更高的读写锁,这样可以精确控制锁定的范围。

    条件变量

    1. 生产者 -> Channel -> 消费者 模型中,由于Channel自带锁机制,生产者获取到CPU时间轮片时,就会去向Channel中写数据,在这个过程中,Channel会先加锁,然后生产者才能开始写数据,但如果缓冲区中的数据已经满了,本次加锁也就毫无意义,对于消费者来说依然如此,这就是条件变量存在的意义!
    2. 原理:
      生产者 -> 判断条件变量 -> 加锁 -> 向缓冲区中写入数据 -> 唤醒阻塞在条件变量上的消费者
      消费者 -> 判断条件变量 -> 加锁 -> 从缓冲区中读取数据 -> 唤醒阻塞在条件变量上的生产者
      
    3. 条件变量并不保证同一时刻仅有一个协程/线程访问某个共享资源,而是在共享数据的状态发生变化时,通知阻塞在某个条件上的协程/线程;条件变量不是锁,在并发中不能达到同步的目的,所以经常与锁结合使用!
    4. Go标准库中的 sync.Cond 类型表示条件变量;
      type Cond struct {
          noCopy noCopy
          L Locker
          notify notifyList
          checker copyChecker
      }
      
      1. L 表示与条件变量搭配使用的锁;
      2. 对应3个常用方法:Wait、Signal、Broadcast
    5. func (c *Cond) Wait()具备三个作用
      1. 让当前Go程阻塞,等待条件变量满足;
      2. 释放已掌握的互斥锁(读写锁),相当于cond.L.Unlock(),所以调用Wait()Goroutine一定已经加了锁;
      3. 1、2两步是一个原子操作,不可再分!当前Go程已经释放了锁,且处于阻塞状态,等待被唤醒;
      4. 当被唤醒时,Wait()函数返回,解除阻塞状态,并重新获取互斥锁(读写锁),相当于cond.L.Lock(),然后从当前位置继续向下执行。
    6. func (c *Cond) Signal():单发通知,唤醒一个在条件变量上的等待的Goroutine
    7. func (c *Cond) Broadcast():广播通知,唤醒所有在条件变量上等待的Goroutine
          //全局条件变量
          var cond sync.Cond
      
          func producer(w chan<- int, idx int) {
              for {
                  //上锁
                  cond.L.Lock()
                  for len(w) == 5 {  // 判断通道是否已经满了,不能用 if
                      cond.Wait()
                  }
                  num := rand.Intn(800)
                  w <- num  //向通道中写入数据
                  fmt.Printf("生产者%d 生产 %d\n", idx, num)
                  //解锁
                  cond.L.Unlock()
                  //唤醒消费者
                  cond.Signal()
                  //睡眠一会,让出时间轮片,给其他Go程执行
                  time.Sleep(time.Millisecond*200)
              }
          }
          func consumer(r <-chan int, idx int) {
              for {
                  cond.L.Lock()
                  for len(r) == 0 {  // 判断通道中是否有元素可以读取,不能用 if
                      cond.Wait()
                  }
                  num := <-r
                  fmt.Printf("消费者%d,消费 %d\n", idx, num)
                  cond.L.Unlock()
                  cond.Signal()
                  time.Sleep(time.Millisecond*200)
              }
          }
      
          func main() {
              ch := make(chan int, 5)
              rand.Seed(time.Now().UnixNano())
              //使用条件变量,并指定所使用的锁
              cond.L = new(sync.Mutex)
              for i := 0; i < 5; i++ {
                  go producer(ch, i+1)
              }
              for i := 0; i < 3; i++ {
                  go consumer(ch, i+1)
              }
              for {
                  ;
              }
          }
      
      1. 声明条件变量:var cond sync.Cond
      2. 为条件变量指定使用的锁:cond.L = new(sync.Mutex)
      3. 上锁:cond.L.Lock(),解锁:cond.L.Unlock()
      4. 在判断是否 cond.Wait() 时,使用 for,而不是 if,因为阻塞的Go程被重新唤醒时,会从被唤醒的代码处继续向下执行,而不会再去判断是否满足条件!for 循环则会在每次Go程被唤醒时,重新去判断是否满足条件,如果不满足跳出条件,则继续阻塞并释放锁。

    相关文章

      网友评论

          本文标题:进阶-2

          本文链接:https://www.haomeiwen.com/subject/nnkbuctx.html