大家在开发过程中肯定遇见过批量任务处理的,一般都是串行执行,今天我们来个并发版本的
- 创建一个批量执行任务的结构体(也就是面向对象的类)
- 生成要执行的任务
- 执行任务
- 等待执行完成,关闭任务
学习快知识点,需要掌握
- sync.WaitGroup
- 通道
- 并发
// @Title 请填写文件名称(需要改)
// @Description 请填写文件描述(需要改)
package main
import (
"fmt"
"sync"
"time"
)
type Worker interface {
Task()
}
type Pool struct {
work chan Worker `desc:"任务接口需要实现"`
wg sync.WaitGroup `desc:"并发等待组"`
}
// @title 初始化一个任务池
// @description 初始化一个任务池
// @param goNum int "任务池的容量"
func New(goNum int) *Pool {
// TODO 初始化一个任务池
p := &Pool{
work: make(chan Worker),
wg: sync.WaitGroup{},
}
// TODO 追加等待组
p.wg.Add(goNum)
// TODO 创建任务池
for i := 0; i < goNum; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return p
}
func (p *Pool) initWork(wg *sync.WaitGroup, works ...Worker) {
wg.Add(len(works))
for _, work := range works {
go func(work Worker) {
p.Run(work)
wg.Done()
}(work)
}
}
// @title 运行的函数
// @description 将要执行的函数发送到通道
// @param w Worker "解释"
func (p *Pool) Run(w Worker) {
p.work <- w
}
// @title 关闭任务池
// @description 关闭任务池
func (p *Pool) ShutDown() {
// TODO 关闭通道
close(p.work)
// TODO 等待goruntinue关闭
p.wg.Wait()
}
type namePrint struct {
name string `desc:"名字"`
}
// @title 输入名字
// @description 输入名字,实现上面的worker接口
func (n *namePrint) Task() {
fmt.Println(n.name)
time.Sleep(time.Microsecond * 100)
}
// @title 生成namePrint切片
// @description 生成namePrint切片
// @return namePrints []*namePrint "namePrint切片"
func generateNamePrint() (namePrints []Worker) {
// TODO 声明一个名字切片
names := []string{"one", "two", "three", "four", "five"}
for _, name := range names {
np := &namePrint{name: name}
namePrints = append(namePrints, np)
}
return
}
func main() {
// TODO 创建一个任务的等待组
wg := &sync.WaitGroup{}
// TODO 使用两个goroutine来创建工作池
p := New(2)
// TODO 生成执行任务
p.initWork(wg, generateNamePrint()...)
// TODO 等待执行
wg.Wait()
// TODO 关闭执行
p.ShutDown()
}
网友评论