能做啥?
当你需要推送1000万条消息,推送每条消息大概需要花10ms。单线程慢慢发你肯定等不了,为每个发送行为开一个协程,哈哈哈,1000万个协程,4kb * 10000000 ,按最小的栈空间算40个G内存算是没了,就算你内存顶得住1000万的并发估计你服务器的文件数,cpu占用早就超过最大值了。入正题,这个时候你需要管理你的协程数,比如说我开辟一个1000的协程池,让这1000个协程并发的去推送1000万数量的消息即可。
通过 Channel 实现 Goroutine Pool,缺点协程的频繁开辟和注销,但简单通用灵活
package gopool
import (
"sync"
)
// Pool Goroutine Pool
type Pool struct {
queue chan int
wg *sync.WaitGroup
}
// New 新建一个协程池
func New(size int) *Pool {
if size <= 0 {
size = 1
}
return &Pool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}
// Add 新增一个执行
func (p *Pool) Add(delta int) {
// delta为正数就添加
for i := 0; i < delta; i++ {
p.queue <- 1
}
// delta为负数就减少
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}
// Done 执行完成减一
func (p *Pool) Done() {
<-p.queue
p.wg.Done()
}
// Wait 等待Goroutine执行完毕
func (p *Pool) Wait() {
p.wg.Wait()
}
以上是gopool的包实现,以下demo取用
package main
import (
"io/ioutil"
"log"
"net/http"
"yumc.pw/cloud/lib/gopool"
)
func main() {
// 这里限制100个并发
pool := gopool.New(100)// sync.WaitGroup{}
//假设需要发送1000万个http请求,然后我并发100个协程取完成这件事
for i := 0; i < 10000000; i++ {
pool.Add(1) //发现已存在100个人正在发了,那么就会卡住,直到有人完成了宣布自己退出协程了
go func(i int) {
resp, err := http.Get("http://ip.3322.org")
if err != nil {
fmt.Println(i, err)
} else {
defer resp.Body.Close()
result, _ := ioutil.ReadAll(resp.Body)
fmt.Println(i, string(result))
}
pool.Done()
}(i)
}
pool.Wait()
}
以上思路是我在go社区看到的,确实简单有效的做到了协程数量的控制,但是有木有发现会存在频发的协程的开辟与剔除,如果对性能有着很高的要求建议优化成固定数目的协程取channel里面取数据进行消费的模式,也就是消费者模式,这样避免了协程的创建与注销。
上代码
package main
import (
"fmt"
"strconv"
"sync"
)
//任务对象
type task struct {
Production
Consumer
}
//设置消费者数目,也就是work pool大小
func(t *task)setConsumerPoolSize(poolSize int){
t.Production.Jobs = make(chan *Job,poolSize * 10)
t.Consumer.WorkPoolNum = poolSize
}
//任务数据对象
type Job struct {
Data string
}
func NewTask(handler func(jobs chan *Job)(b bool))(t *task){
t = &task{
Production:Production{Jobs: make(chan *Job,100)},
Consumer:Consumer{WorkPoolNum:100,Handler:handler},
}
return
}
type Production struct {
Jobs chan *Job
}
func (c Production)AddData(data *Job){
c.Jobs <- data
}
type Consumer struct {
WorkPoolNum int
Handler func(chan *Job)(b bool)
Wg sync.WaitGroup
}
//异步开启多个work去处理任务,但是所有work执行完毕才会退出程序
func (c Consumer)disposeData(data chan *Job){
for i:=0;i<=c.WorkPoolNum;i++{
c.Wg.Add(1)
go func() {
defer func() {
c.Wg.Done()
}()
c.Handler(data)
}()
}
c.Wg.Wait()
}
func main(){
//1.先实现一个用于处理数据的闭包,在这里面实现自己业务
consumerHandler := func(jobs chan *Job)(b bool) {
for job := range jobs {
fmt.Println(job)
}
return
}
//2.new一个任务处理对象出来
t :=NewTask(consumerHandler)
t.setConsumerPoolSize(500)//500个协程同时消费
//3.根据自己的业务去生产数据通过AddData方法去添加数据到生产channel,这里是1000万条数据
go func(){
for i := 0; i < 10000000; i++ {
job := new(Job)
iStr := strconv.Itoa(i)
job.Data = "这里面去定义你的任务数据格式"+ iStr
t.AddData(job)
}
}()
//4.消费者消费数据
t.Consumer.disposeData(t.Production.Jobs)
}
网友评论