美文网首页
如何周期性的对任务节流

如何周期性的对任务节流

作者: gruan | 来源:发表于2020-04-16 13:07 被阅读0次

    Limit action execution max count per period.

    工作中, 有很多地方需要限制一段时间内, 对某个方法的调用.
    比如某某API 限制你每分钟只能请求 600次, 怎么办呢?? 一次请求的耗时, 要看对方的响应速度, 服务器的网速, 等多方面的综合因素. 所以不好估算每秒,每分钟我能调用多少次, 只能一次次的去估算, 把并发数调低, 请求间隔调长. 调低, 调长, 会拉长执行间; 调高,调短 会被限制频率, 很是头疼...

    疫情过后, 一地鸡毛, 别人复工, 我们轮流在家休假. 上天终于给我安排了时间面对这个问题了.


    一地鸡毛

    合理安排任务

    是周期性的把一批任务压进内存, 还是视任务队列的执行情况, 在决定要压进多少新任务呢??
    假设每个任务要执行2秒, 而每1秒可以执行100次, 那么完成这100个任务就需要2秒.
    如果不考虑任务的执行情况, 在第二秒的时候, 任务队列里就会有200个任务在处理.
    就好比高铁, 每一站下多少人, 就可以在卖几张票, 而不是不管下了多少人, 都按整列车的座位数重新售票.
    前一秒的还没有执行完, 后一秒的又进来了, 这是要累死牛的节凑, 人都不干, 更何况机器, 如果真这样, 内存/CPU 迟早要完玩...


    抢票

    处理并发

    秉承最短时间内,做最多的工作的原则, 我们可以认为: 一个周期内, 最大可执行的次数, 就是周期内的最大并发数.
    如果想限制最大任务并行数, 可以用 SemaphoreSlimSemaphore .
    但是如果想限制一段时间内, 最大任务执行次数, 用 Semaphore 就不好办了, 因为不能确定每个任务啥时间运行完.(欠考虑, 其实是和 BlockingCollection<T> 是一样的,只不过需要手动 release 而已。)
    那要怎么限制并发呢??

    BlockingCollection<T>

    这个是什么不需要我解释, 我们用它来模拟那辆高铁的座位: 位置是有限的, 下去几个人就可以上几个人, 多了上不了,买票请排队. 但是这个对象不能用来存放 Task, 因为哪个 Task 先完成, 哪个 Task 后完成不是可以安排的, 只有当任务完成后, 才能释放出来一个空位, 就如同高铁上的座位只有等乘客下车才能释放出来, 而不是一上车,说我到哪哪下之后就可以释放出来. 所以这个 BlockingCollection<T> 只用来存占位符, 真正的任务队列我们另请高明.
    本着先进先出的原则, 我们用 ConcurrentQueue<T> 来存放任务队列.

    /// <summary>
    /// 用于限速的阻止队列, 如果空不足,插入操作就会等待.
    /// </summary>
    private readonly BlockingCollection<int> block;
    
    /// <summary>
    /// 真正的任务队列
    /// </summary>
    private readonly ConcurrentQueue<Task> tsks = new ConcurrentQueue<Task>();
    

    有票才能上车

    上车的时候, 需要在阻止队列中插入一个占位符, 占位符插入成后, 才能把任务添加到任务队列中.

    //占用一个空间, 如果空间占满, 会无限期等待,直至有空间释放出来
    this.block.Add(0);
    
    //Console.WriteLine($"{DateTime.Now}..................Add");
    
    //占用一个空间后, 才能将任务插入队列
    this.tsks.Enqueue(task);
    

    上车之前, 还要对任务做一下扩展: 下车的时候, 把你占用的位置释放出来

    _tsk.ContinueWith(tt =>
    {
        //当任务执行完时, 才能阻止队列的一个空间出来,供下一个任务进入
        this.block.TryTake(out _);
        Interlocked.Increment(ref this._totalExecuted);
        //Console.WriteLine($"{DateTime.Now}..................Release");
    
    }, TaskContinuationOptions.AttachedToParent);
    

    帽子戏法

    如果一个任务是 Task<Task>Task<Task<T>> , 你一定要对它 Unwrap, 否则一眨眼, 你的任务就执行完了, 下车回头一看, 才发现: 我艹, 我包呢?? 你的包(子任务)还在等待着开往春天的地铁呢.

    这车不对劲
    if (task is Task<Task> t)
    {
        //如果 task 是 task 的嵌套, 需要先解包装 
        _tsk = t.Unwrap();
    }
    

    但是 task is Task<Task<>> (语法错误) 还是task is Task<object>(类型不匹配) 呢?? 怎么都不对, 这个时候才发现 C# 太TMD 的不地道了, 这么简单的车, 还需要我在兜一圈.

    ...
    else if (task is IUnwrap tt)
    {
        _tsk = tt.GetUnwrapped();
    }
    
    ...
    public interface IUnwrap
    {
        Task GetUnwrapped();
    }
    
    public class WrapFuncTask<T> : Task<Task<T>>, IUnwrap
    {
        ...
    
        public Task GetUnwrapped()
        {
            return this.Unwrap();
        }
    }
    

    差不多了, 开车

    先加个开车提醒:

    this.timer = new System.Timers.Timer(period.TotalMilliseconds)
    {
        AutoReset = true
    };
    this.timer.Elapsed += Timer_Elapsed;
    this.timer.Start();
    
    ...
    private void Timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        this._currentCount = 0;
        this.PeriodNumber++;
    
        this.TryProcessQueue();
    }
    

    开车了....

    /// <summary>
    /// 
    /// </summary>
    private void ProcessQueue()
    {
        //当 当前计数 小于周期内最大允许的任务数
        //且 任务队列中有任务可以取出来
        while ((this._currentCount < this.MaxCountPerPeriod)
            && tsks.TryDequeue(out Task tsk))
        {
            //Console.WriteLine($"{DateTime.Now}..................Dequeue");
            Interlocked.Increment(ref this._currentCount);
            //执行任务
            tsk.Start();
        }
    }
    
    开车啦

    戳: 代码 Gist 地址

    相关文章

      网友评论

          本文标题:如何周期性的对任务节流

          本文链接:https://www.haomeiwen.com/subject/odrrvhtx.html