package gpool
type GJobFunc func(arg ...interface{})
type GJob struct {
f GJobFunc
arg []interface{}
}
type GWorker struct {
g *GPool
jobC chan *GJob
done chan interface{}
}
type GPool struct {
workerList []*GWorker
workers chan *GWorker
jobs chan *GJob
done chan interface{}
}
func NewGPool(workerMaxNum, jobMaxNum int) *GPool {
g := &GPool{
workerList: make([]*GWorker, workerMaxNum),
workers: make(chan *GWorker, workerMaxNum),
jobs: make(chan *GJob, jobMaxNum),
done: make(chan interface{}, 1),
}
for i := 0; i < workerMaxNum; i++ {
w := &GWorker{
g: g,
jobC: make(chan *GJob, 1),
done: make(chan interface{}, 1),
}
g.workerList[i] = w
go w.run()
}
go g.dispatch()
return g
}
func (g *GPool) Free() {
go func() {
g.done <- struct{}{}
for _, w := range g.workerList {
w.stop()
}
}()
}
func (g *GPool) AddJob(f GJobFunc, arg ...interface{}) {
g.jobs <- &GJob{
f: f,
arg: arg,
}
}
func (g *GPool) dispatch() {
for {
select {
case job := <-g.jobs:
g.getWorker().putJob(job)
case <-g.done:
return
}
}
}
func (g *GPool) putWorker(w *GWorker) {
g.workers <- w
}
func (g *GPool) getWorker() *GWorker {
return <-g.workers
}
func (w *GWorker) run() {
for {
w.g.putWorker(w)
select {
case job := <-w.jobC:
job.f(job.arg...)
case <-w.done:
return
}
}
}
func (w *GWorker) stop() {
w.done <- struct{}{}
}
func (w *GWorker) putJob(job *GJob) {
w.jobC <- job
}
package test_gpool
import (
"fmt"
"gpool"
"math/rand"
"sync"
"time"
)
func TestGPool() {
wg := &sync.WaitGroup{}
wg.Add(30)
g := gpool.NewGPool(10, 100)
defer g.Free()
for i := 0; i < 30; i++ {
now := time.Now()
g.AddJob(TestTask, now, wg, i, i*30)
}
wg.Wait()
}
func TestTask(arg ...interface{}) {
fmt.Println(time.Now().Format("[2006-01-02 15:04:05.000000]"), arg)
time.Sleep(time.Duration(rand.Intn(10))*time.Second)
wg := arg[1].(*sync.WaitGroup)
wg.Done()
}
package semaphore
import "time"
type Semaphore struct {
channel chan interface{}
}
/* 创建信号量 */
func NewSemaphore(permits int) *Semaphore {
return &Semaphore{channel: make(chan interface{}, permits)}
}
/* 获取许可 */
func (s *Semaphore) Acquire() {
s.channel <- struct{}{}
}
/* 释放许可 */
func (s *Semaphore) Release() {
<-s.channel
}
/* 尝试获取许可 */
func (s *Semaphore) TryAcquire() bool {
select {
case s.channel <- struct{}{}:
return true
default:
return false
}
}
/* 尝试指定时间内获取许可 */
func (s *Semaphore) TryAcquireOnTime(timeout time.Duration) bool {
for {
select {
case s.channel <- struct{}{}:
return true
case <-time.After(timeout):
return false
}
}
}
/* 当前可用的许可数 */
func (s *Semaphore) AvailablePermits() int {
return cap(s.channel) - len(s.channel)
}
网友评论