具体的池子
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
const (
defaultCapacity = 10
expireSecond = 2
)
type Pool struct {
capacity int32 // 最大同时执行的协程
running int32 // 当前在执行的
waiting int32 // 当前等待的数量
state int32 // 当前池子的状态,如果为1,则关闭
lock sync.Locker // 自旋锁,实现了加锁和解锁方法
workers workerQueue // worker队列,实现了一个接口,不同实现方式代表不同的队列
workerCache sync.Pool // 存放每个创建出来的worker
cond *sync.Cond // 当协程过多的时候需要阻塞,然后唤醒
}
func newPool(size int32) *Pool {
if size <= 0 {
size = defaultCapacity
}
p := &Pool{
capacity: size,
workers: newWorkerStack(size),
lock: NewSpinLock(),
}
p.cond = sync.NewCond(p.lock)
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), 1),
}
}
go p.purgeStableWorkers()
return p
}
func (p *Pool) submit(task func()) error {
if w := p.getWorker(); w != nil {
w.inputFunc(task)
fmt.Println("放入成功")
return nil
}
fmt.Println("放入失败")
return fmt.Errorf("放入失败")
}
func (p *Pool) getWorker() (w worker) {
// 这里只有启动的时候会执行,后续基本不会执行,因为都是直接从队列中获取worker
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
//
p.lock.Lock()
w = p.workers.detach()
if w != nil {
fmt.Println("获得worker")
p.lock.Unlock()
return w
} else if p.running < p.capacity {
fmt.Println("启动")
p.lock.Unlock()
spawnWorker()
} else {
if p.isClosed() {
p.lock.Unlock()
return
}
fmt.Println("开始重试")
retry:
// 阻塞和重试
p.addWaiting(1)
p.cond.Wait()
p.addWaiting(-1)
if p.isClosed() {
p.lock.Unlock()
return
}
if w = p.workers.detach(); w == nil {
fmt.Println("没有获得任务")
if p.free() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
fmt.Println("获得任务成功")
p.lock.Unlock()
}
return
}
func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}
func (p *Pool) revertWorker(w *goWorker) bool {
fmt.Println("恢复worker")
if p.capacity < p.running || p.isClosed() {
p.cond.Broadcast()
fmt.Println("已关闭1")
return false
}
p.lock.Lock()
if p.isClosed() {
fmt.Println("已关闭1")
p.lock.Unlock()
return false
}
w.lastUsed = time.Now().Unix()
if err := p.workers.insert(w); err != nil {
p.lock.Unlock()
return false
}
p.cond.Signal()
p.lock.Unlock()
return true
}
// isClosed 是否关闭
func (p *Pool) isClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSE
}
func (p *Pool) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}
// free 当前池子是否有空闲
func (p *Pool) free() bool {
return p.capacity-p.running > 0
}
func (p *Pool) purgeStableWorkers() {
//ctx, _ := context.WithCancel(context.Background())
ticker := time.NewTicker(time.Second * 2)
defer func() {
ticker.Stop()
}()
//
for {
select {
case <-ticker.C:
}
fmt.Println("开始清楚")
// 开始执行主逻辑
var isDormant bool
p.lock.Lock()
stableWorker := p.workers.refresh(time.Now().Unix() - expireSecond)
fmt.Printf("空闲数量:%d\n", len(stableWorker))
n := p.running
isDormant = n == 0 || len(stableWorker) == int(n)
p.lock.Unlock()
for i := range stableWorker {
stableWorker[i].finish()
stableWorker[i] = nil
}
// 全部空闲
if isDormant && p.waiting > 0 {
fmt.Println("全部空闲")
p.cond.Broadcast()
}
}
}
自旋锁:对worker进行操作时需要上锁,因为切片不是线程安全的
package main
import (
"runtime"
"sync"
"sync/atomic"
)
type spinLock uint32
const maxBackoff = 16
// Lock 加锁 使用CAS将0换成1 成功了就加锁成功,否则一直重试,重试次数从1增加到16
func (sl *spinLock) Lock() {
backOff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backOff; i++ {
runtime.Gosched()
}
if backOff < maxBackoff {
backOff <<= 1 // 左移1位
}
}
}
// Unlock 解锁,CAS将1改成0
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
func NewSpinLock() sync.Locker {
return new(spinLock)
}
具体的worker实现
package main
import (
"fmt"
)
type goWorker struct {
// pool who owns this worker.
pool *Pool
// task is a job should be done.
task chan func()
// lastUsed will be updated when putting a worker back into queue.
lastUsed int64
}
func (w *goWorker) inputFunc(fn func()) {
w.task <- fn
}
func (w *goWorker) run() {
w.pool.addRunning(1)
fmt.Printf("当前运行的go:%d\n", w.pool.running)
go func() {
defer func() {
w.pool.addRunning(-1)
w.pool.workerCache.Put(w)
w.pool.cond.Signal() // 唤醒一个协程
fmt.Println("任务结束----------------")
}()
// 在此遍历执行task
for task := range w.task {
if task == nil {
return
}
task()
if ok := w.pool.revertWorker(w); !ok {
return
}
//time.Sleep(time.Second * 5)
}
}()
}
// finish 往task中放入一个nil,代表本次worker结束
func (w *goWorker) finish() {
w.task <- nil
}
func (w *goWorker) lastUsedTime() int64 {
return w.lastUsed
}
worker接口
package main
// 工作队列interface
type worker interface {
run() // 执行任务
finish() // 强制当前worker完成任务
lastUsedTime() int64
inputFunc(func())
//inputParam(interface{})
}
type workerQueue interface {
len() int
//isEmpty() bool
insert(worker) error
detach() worker
refresh(duration int64) []worker // clean up the stale workers and return them
//reset()
}
队列类型的worker
package main
import (
"fmt"
)
type workerStack struct {
items []worker
expiry []worker
}
func newWorkerStack(size int32) *workerStack {
return &workerStack{
items: make([]worker, 0, size),
}
}
// insert 任务完成后,将worker放回队列中
func (ws *workerStack) insert(w worker) error {
ws.items = append(ws.items, w)
fmt.Printf("当前insert数量:%d\n", len(ws.items))
return nil
}
// detach 每次任务都要先从这里获取,获取不到的话就从池中获取或者循环从这里获取
func (ws *workerStack) detach() worker {
l := len(ws.items)
if l <= 0 {
return nil
}
w := ws.items[l-1]
ws.items[l-1] = nil
ws.items = ws.items[:l-1]
return w
}
// refresh 将空闲worker回收
func (ws *workerStack) refresh(expireTime int64) []worker {
n := ws.len()
if n <= 0 {
return nil
}
index := ws.binarySearch(0, n-1, expireTime)
ws.expiry = ws.expiry[:0] // 重置
if index != -1 {
ws.expiry = append(ws.expiry, ws.items[:index+1]...)
m := copy(ws.items, ws.items[index+1:]) // 把未过期的worker拷贝到数组前面
for i := m; i < n; i++ {
ws.items[m] = nil
}
ws.items = ws.items[:m]
}
return ws.expiry
}
func (ws *workerStack) len() int {
return len(ws.items)
}
func (ws *workerStack) binarySearch(left, right int, expireTime int64) int {
fmt.Printf("过期时间:%d", expireTime)
for left <= right {
mid := (left + right) >> 1
fmt.Println(ws.items[mid].lastUsedTime())
if ws.items[mid].lastUsedTime() > expireTime {
right = mid - 1
} else {
left = mid + 1
}
}
fmt.Println(right)
return right
}
网友评论