背景
在我们的服务中,通常会调用到其他的服务,比如通过账号uin从别的服务拉取数据,别人通常会提供单条数据拉取和批量拉取的情况,但是我们的请求一次可能只有一个账号uin,此时只能单条拉取,当QPS较高的时候,对别人的服务造成的压力也会很大,最好的方法还是通过批量去拉取,但是我们的接口有超时时间,不可能等数据凑到一批才去拉取,应该及时返回对应的结果。
我们可以本地建立一个通道,请求到来的时候先将请求放到通道中,并给接口返回正确的结果,若数据不是很重要,可以丢弃一部分,可以在通道满的时候将数据丢弃。然后同时起多个协程一直从通道中拉取数据,只要协程数量和通道大小设置的合理,就可以解决问题了。
代码如下:
package task
import (
"time"
)
type Task interface {
// HandleTask batchSize:批量次数的次数 duration:多久执行一次 timer:超时时间,超过这个时间的话会退出
HandleTask(batchSize int, duration time.Duration, timer *time.Timer)
}
package task
import (
"fmt"
"math/rand"
"sync"
"time"
)
var defaultTaskContainer = &TaskContainer{}
type TaskInfo struct {
batchSize int // 批量处理
duration time.Duration // 超时时间
task Task // 具体执行的任务,实现这个接口即可
activeThreadCount int // 支持同时执行的线程数
}
type TaskContainer struct {
tasks []*TaskInfo
}
func (obj *TaskContainer) appendTask2Container(batchSize int, duration time.Duration, task Task, activeThreadCount int) {
obj.tasks = append(obj.tasks, &TaskInfo{batchSize: batchSize, duration: duration, task: task, activeThreadCount: activeThreadCount})
}
func (obj *TaskContainer) Run() {
fmt.Println("进来")
for i := range obj.tasks {
doBatch(10, 10, func(index int) {
fmt.Println("开始执行任务")
for {
//time.Sleep(time.Second)
obj.tasks[i].task.HandleTask(obj.tasks[i].batchSize, obj.tasks[i].duration, time.NewTimer(obj.tasks[i].duration))
}
})
}
}
// doBatch 启动activeThreadCount或max个线程
func doBatch(max, activeThreadCount int, execFun func(index int)) {
if activeThreadCount > max {
activeThreadCount = max
}
wg := sync.WaitGroup{}
workLimitChannel := make(chan struct{}, activeThreadCount)
for i := 0; i < max; i++ {
select {
case workLimitChannel <- struct{}{}:
fmt.Println("可以执行")
wg.Add(1)
go func(i int) {
defer func() {
<-workLimitChannel
wg.Done()
}()
}(i)
execFun(i)
}
}
wg.Wait()
}
func main() {
//go defaultTaskContainer.Run()
fmt.Println("开始")
rand.Seed(time.Now().UnixNano())
go func() {
for {
select {
case datas <- rand.Int():
fmt.Println("放入成功")
time.Sleep(time.Millisecond * 10)
default:
fmt.Println("满了,算了")
}
}
}()
time.Sleep(time.Hour)
}
package task
import (
"fmt"
"time"
)
var datas = make(chan int, 1000)
var defaultTaskHandler = &taskHandler{}
type taskHandler struct {
}
func init() {
defaultTaskContainer.appendTask2Container(100, time.Second, defaultTaskHandler, 3)
}
func (obj *taskHandler) HandleTask(batchSize int, duration time.Duration, timer *time.Timer) {
timer.Reset(duration)
var arr []int
Batch:
for i := 0; i < batchSize; i++ {
//time.Sleep(time.Millisecond * 200)
select {
case data, ok := <-datas:
if !ok {
continue
}
arr = append(arr, data)
case <-timer.C:
fmt.Println("时间到")
break Batch
default:
fmt.Println("default")
}
}
//
if len(arr) <= 0 {
return
}
fmt.Println("开始处理数组数据:", arr)
}
package task
import (
"fmt"
"math/rand"
"testing"
"time"
)
func TestTaskContainer_Run(t *testing.T) {
go defaultTaskContainer.Run()
fmt.Println("开始")
rand.Seed(time.Now().UnixNano())
go func() {
for {
select {
case datas <- rand.Int():
fmt.Println("放入成功")
//time.Sleep(time.Millisecond * 1000)
default:
fmt.Println("满了,算了")
}
}
}()
time.Sleep(time.Hour)
}
网友评论