美文网首页
channel用做限制并发/不定参数

channel用做限制并发/不定参数

作者: zzjack | 来源:发表于2017-10-19 14:43 被阅读0次

原文链接:https://mp.weixin.qq.com/s/8ngivl-NsXj2C4W1jLHFHQ
原文链接: https://segmentfault.com/a/1190000006261218
通过limit管道协调goroutines,以保证同时最多有三个work在运行

var limit = make(chan int,3)
func main(){
  for _,w := range work{
      //note:如果不带参数的话,函数也可以作为参数进行传递,类型是func()
      go func(w func()){
          limit <- 1
          w()
          <- limit
        }
    }
}

pipeline

定义
通过channel从上游接收数据,对数据进行加工形成新数据,然后通过channel发送到下游。

不定参数

用...可以传递不定参数,nums的类型是[]int

func gen(nums ...int) chan int{
    fmt.Println(reflect.TypeOf(nums)) 
    out := make(chan int)
    go func(){
        for _,n := range nums{
            out <- n
        }
        close(out)
    }()
    return out
}

有两种方式传参调用gen

func main(){
//直接传入参数
  gen(1,2,3) 
//传入[]int后面加...
gen([]int{1,2,3}...)
}

匿名函数和锁

这个调用匿名函数的方法和js一样。
加锁的方法

func merge(cs ...<-chan int) <-chan int{
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <- chan int){
        for n := range c{
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _,c := range cs{
        go output(c)
    }

    go func(){
        wg.Wait()
        close(out)
    }()
    return out
}

显示发送者

需要设置一个全局共享的channel,
然后select可以接收输出,这么用。

func sq(done <-chan struct{},in <-chan int) <-chan int{
    out := make(chan int)
    go func(){
        for n:= range in{
            select{
            case out <- n*n:
            case <- done:
                return
            }
        }
        defer close(out)
    }()
    return out
}
func main(){
    //设置一个全局共享的done channel
    //当流水线退出时,关闭done channel
    //所有goroutine接收到done的信号后,
    //都会正常退出。
    done := make(chan struct{})
    defer close(done)
    in := gen(2,3)
    //讲sq的工作分发给两个goroutine
    //这两个goroutine均从in读取数据
    c1 := sq(done,in)
    c2 := sq(done,in)
    out := merge(done,c1,c2)
    fmt.Println(<-out)
}

func gen(nums ...int) <-chan int{
    fmt.Println(reflect.TypeOf(nums))
    out := make(chan int)
    go func(){
        for _,n := range nums{
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(done <-chan struct{},in <-chan int) <-chan int{
    out := make(chan int)
    go func(){
        for n:= range in{
            select{
            case out <- n*n:
            case <- done:
                return
            }
        }
        defer close(out)
    }()
    return out
}

func merge(done <- chan struct{},cs ...<-chan int) <-chan int{
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <- chan int){
        for n := range c{
            select{
            case out<-n:
                case <- done:
            }
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _,c := range cs{
        go output(c)
    }

    go func(){
        wg.Wait()
        close(out)
    }()
    return out
}
  1. 除default外,如果只有一个case语句评估通过,那么就执行这个case里的语句。
  2. 除default外,如果有多个case语句评估通过,那么通过伪随机的方式随机选择执行一个;
  3. case没有匹配上,就执行default;
  4. 没有default就会一直阻塞;
  5. 如果没有for循环,那么匹配执行后就会退出。

相关文章

网友评论

      本文标题:channel用做限制并发/不定参数

      本文链接:https://www.haomeiwen.com/subject/bcwyuxtx.html