美文网首页
channel+select批量处理请求

channel+select批量处理请求

作者: 彳亍口巴 | 来源:发表于2021-12-31 16:07 被阅读0次

    背景

    在我们的服务中,通常会调用到其他的服务,比如通过账号uin从别的服务拉取数据,别人通常会提供单条数据拉取和批量拉取的情况,但是我们的请求一次可能只有一个账号uin,此时只能单条拉取,当QPS较高的时候,对别人的服务造成的压力也会很大,最好的方法还是通过批量去拉取,但是我们的接口有超时时间,不可能等数据凑到一批才去拉取,应该及时返回对应的结果。
    我们可以本地建立一个通道,请求到来的时候先将请求放到通道中,并给接口返回正确的结果,若数据不是很重要,可以丢弃一部分,可以在通道满的时候将数据丢弃。然后同时起多个协程一直从通道中拉取数据,只要协程数量和通道大小设置的合理,就可以解决问题了。

    代码如下:

    package task
    
    import (
        "time"
    )
    
    type Task interface {
        // HandleTask batchSize:批量次数的次数  duration:多久执行一次   timer:超时时间,超过这个时间的话会退出
        HandleTask(batchSize int, duration time.Duration, timer *time.Timer)
    }
    
    
    
    package task
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    var defaultTaskContainer = &TaskContainer{}
    
    type TaskInfo struct {
        batchSize         int           // 批量处理
        duration          time.Duration // 超时时间
        task              Task          // 具体执行的任务,实现这个接口即可
        activeThreadCount int           // 支持同时执行的线程数
    }
    
    type TaskContainer struct {
        tasks []*TaskInfo
    }
    
    func (obj *TaskContainer) appendTask2Container(batchSize int, duration time.Duration, task Task, activeThreadCount int) {
        obj.tasks = append(obj.tasks, &TaskInfo{batchSize: batchSize, duration: duration, task: task, activeThreadCount: activeThreadCount})
    }
    
    func (obj *TaskContainer) Run() {
        fmt.Println("进来")
        for i := range obj.tasks {
            doBatch(10, 10, func(index int) {
                fmt.Println("开始执行任务")
    
                for {
                    //time.Sleep(time.Second)
                    obj.tasks[i].task.HandleTask(obj.tasks[i].batchSize, obj.tasks[i].duration, time.NewTimer(obj.tasks[i].duration))
                }
            })
        }
    }
    
    // doBatch 启动activeThreadCount或max个线程
    func doBatch(max, activeThreadCount int, execFun func(index int)) {
        if activeThreadCount > max {
            activeThreadCount = max
        }
        wg := sync.WaitGroup{}
        workLimitChannel := make(chan struct{}, activeThreadCount)
        for i := 0; i < max; i++ {
            select {
            case workLimitChannel <- struct{}{}:
                fmt.Println("可以执行")
                wg.Add(1)
                go func(i int) {
                    defer func() {
                        <-workLimitChannel
                        wg.Done()
                    }()
                }(i)
                execFun(i)
            }
        }
        wg.Wait()
    }
    
    func main() {
        //go defaultTaskContainer.Run()
        fmt.Println("开始")
        rand.Seed(time.Now().UnixNano())
        go func() {
            for {
                select {
                case datas <- rand.Int():
                    fmt.Println("放入成功")
                    time.Sleep(time.Millisecond * 10)
                default:
                    fmt.Println("满了,算了")
                }
            }
        }()
        time.Sleep(time.Hour)
    }
    
    
    
    package task
    
    import (
        "fmt"
        "time"
    )
    
    var datas = make(chan int, 1000)
    var defaultTaskHandler = &taskHandler{}
    
    type taskHandler struct {
    }
    
    func init() {
        defaultTaskContainer.appendTask2Container(100, time.Second, defaultTaskHandler, 3)
    }
    
    func (obj *taskHandler) HandleTask(batchSize int, duration time.Duration, timer *time.Timer) {
        timer.Reset(duration)
        var arr []int
    Batch:
        for i := 0; i < batchSize; i++ {
            //time.Sleep(time.Millisecond * 200)
            select {
            case data, ok := <-datas:
                if !ok {
                    continue
                }
                arr = append(arr, data)
            case <-timer.C:
                fmt.Println("时间到")
                break Batch
            default:
                fmt.Println("default")
            }
        }
        //
        if len(arr) <= 0 {
            return
        }
        fmt.Println("开始处理数组数据:", arr)
    }
    
    
    
    package task
    
    import (
        "fmt"
        "math/rand"
        "testing"
        "time"
    )
    
    func TestTaskContainer_Run(t *testing.T) {
        go defaultTaskContainer.Run()
    
        fmt.Println("开始")
        rand.Seed(time.Now().UnixNano())
        go func() {
            for {
                select {
                case datas <- rand.Int():
                    fmt.Println("放入成功")
                    //time.Sleep(time.Millisecond * 1000)
                default:
                    fmt.Println("满了,算了")
                }
            }
        }()
        time.Sleep(time.Hour)
    }
    
    

    相关文章

      网友评论

          本文标题:channel+select批量处理请求

          本文链接:https://www.haomeiwen.com/subject/nangqrtx.html