github
https://github.com/panjf2000/ants
基本使用
- 阻塞式协程池例子
func TestAntsPool(t *testing.T) {
// worker function
f := func(i interface{}) {
fmt.Println("running ", i)
time.Sleep(5 * time.Second)
}
// 创建阻塞式协程池,pool_size为1
p, _ := ants.NewPoolWithFunc(1, f, ants.WithNonblocking(false))
// use a goroutine to submit a task to a pool
var wg = &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i, " invoke time", time.Now())
err := p.Invoke(i)
fmt.Println(i, " invoke finished time", time.Now())
if err != nil {
fmt.Println("err: ", err.Error())
}
}(i)
}
wg.Wait()
}
output
0 invoke time 2022-01-10 14:16:53.254327218 +0800 CST m=+0.271339451
4 invoke time 2022-01-10 14:16:53.254330755 +0800 CST m=+0.271342968
1 invoke time 2022-01-10 14:16:53.254351734 +0800 CST m=+0.271363967
2 invoke time 2022-01-10 14:16:53.254340133 +0800 CST m=+0.271352346
0 invoke finished time 2022-01-10 14:16:53.254397901 +0800 CST m=+0.271410114
running 0
3 invoke time 2022-01-10 14:16:53.254358828 +0800 CST m=+0.271371051
4 invoke finished time 2022-01-10 14:16:58.255227731 +0800 CST m=+5.272239934
running 4
1 invoke finished time 2022-01-10 14:17:03.259344098 +0800 CST m=+10.276356311
running 1
2 invoke finished time 2022-01-10 14:17:08.263325553 +0800 CST m=+15.280337766
running 2
3 invoke finished time 2022-01-10 14:17:13.263580826 +0800 CST m=+20.280593039
running 3
PASS
上面的例子中,round 1 2 3 4 的invoke被调用和返回都存在时间差,invoke内部会去寻找一个可用的worker协程:
// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorkerWithFunc
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.args <- args
return nil
}
在阻塞式协程池中,如果retrieveWorker()找不到可用的worker协程,内部会wait()阻塞住,等待存在可用的worker协程时唤醒;而在非阻塞式协程池中,如果找不到可用的worker协程,retrieveWorker()会返回一个nil worker,那么Invoke会直接返回ErrPoolOverload
- 非阻塞式协程池例子
func TestAntsPool(t *testing.T) {
// worker function
f := func(i interface{}) {
fmt.Println("running ", i)
time.Sleep(5 * time.Second)
}
// 阻塞式协程池
p, _ := ants.NewPoolWithFunc(1, f, ants.WithNonblocking(true))
// use a goroutine to submit a task to a pool
var wg = &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i, " invoke time", time.Now())
err := p.Invoke(i)
fmt.Println(i, " invoke finished time", time.Now())
if err != nil {
fmt.Println(i, " err: ", err.Error())
}
}(i)
}
wg.Wait()
}
2 invoke time 2022-01-10 15:16:35.271265597 +0800 CST m=+0.186927295
4 invoke time 2022-01-10 15:16:35.271254096 +0800 CST m=+0.186915783
4 invoke finished time 2022-01-10 15:16:35.271326442 +0800 CST m=+0.186988179
2 invoke finished time 2022-01-10 15:16:35.271327854 +0800 CST m=+0.186989541
3 invoke time 2022-01-10 15:16:35.271310131 +0800 CST m=+0.186971818
3 invoke finished time 2022-01-10 15:16:35.2713415 +0800 CST m=+0.187003187
3 err: too many goroutines blocked on submit or Nonblocking is set
running 2
4 err: too many goroutines blocked on submit or Nonblocking is set
0 invoke time 2022-01-10 15:16:35.271258314 +0800 CST m=+0.186919991
0 invoke finished time 2022-01-10 15:16:35.271353092 +0800 CST m=+0.187014779
0 err: too many goroutines blocked on submit or Nonblocking is set
1 invoke time 2022-01-10 15:16:35.271356658 +0800 CST m=+0.187018345
1 invoke finished time 2022-01-10 15:16:35.271394629 +0800 CST m=+0.187056316
1 err: too many goroutines blocked on submit or Nonblocking is set
网友评论