Limit action execution max count per period.
工作中, 有很多地方需要限制一段时间内, 对某个方法的调用.
比如某某API 限制你每分钟只能请求 600次, 怎么办呢?? 一次请求的耗时, 要看对方的响应速度, 服务器的网速, 等多方面的综合因素. 所以不好估算每秒,每分钟我能调用多少次, 只能一次次的去估算, 把并发数调低, 请求间隔调长. 调低, 调长, 会拉长执行间; 调高,调短 会被限制频率, 很是头疼...
疫情过后, 一地鸡毛, 别人复工, 我们轮流在家休假. 上天终于给我安排了时间面对这个问题了.
一地鸡毛
合理安排任务
是周期性的把一批任务压进内存, 还是视任务队列的执行情况, 在决定要压进多少新任务呢??
假设每个任务要执行2秒, 而每1秒可以执行100次, 那么完成这100个任务就需要2秒.
如果不考虑任务的执行情况, 在第二秒的时候, 任务队列里就会有200个任务在处理.
就好比高铁, 每一站下多少人, 就可以在卖几张票, 而不是不管下了多少人, 都按整列车的座位数重新售票.
前一秒的还没有执行完, 后一秒的又进来了, 这是要累死牛的节凑, 人都不干, 更何况机器, 如果真这样, 内存/CPU 迟早要完玩...
抢票
处理并发
秉承最短时间内,做最多的工作的原则, 我们可以认为: 一个周期内, 最大可执行的次数, 就是周期内的最大并发数.
如果想限制最大任务并行数, 可以用 SemaphoreSlim
或 Semaphore
.
但是如果想限制一段时间内, 最大任务执行次数, 用 (欠考虑, 其实是和 Semaphore
就不好办了, 因为不能确定每个任务啥时间运行完.BlockingCollection<T>
是一样的,只不过需要手动 release 而已。)
那要怎么限制并发呢??
BlockingCollection<T>
这个是什么不需要我解释, 我们用它来模拟那辆高铁的座位: 位置是有限的, 下去几个人就可以上几个人, 多了上不了,买票请排队. 但是这个对象不能用来存放 Task, 因为哪个 Task 先完成, 哪个 Task 后完成不是可以安排的, 只有当任务完成后, 才能释放出来一个空位, 就如同高铁上的座位只有等乘客下车才能释放出来, 而不是一上车,说我到哪哪下之后就可以释放出来. 所以这个 BlockingCollection<T>
只用来存占位符, 真正的任务队列我们另请高明.
本着先进先出的原则, 我们用 ConcurrentQueue<T>
来存放任务队列.
/// <summary>
/// 用于限速的阻止队列, 如果空不足,插入操作就会等待.
/// </summary>
private readonly BlockingCollection<int> block;
/// <summary>
/// 真正的任务队列
/// </summary>
private readonly ConcurrentQueue<Task> tsks = new ConcurrentQueue<Task>();
有票才能上车
上车的时候, 需要在阻止队列中插入一个占位符, 占位符插入成后, 才能把任务添加到任务队列中.
//占用一个空间, 如果空间占满, 会无限期等待,直至有空间释放出来
this.block.Add(0);
//Console.WriteLine($"{DateTime.Now}..................Add");
//占用一个空间后, 才能将任务插入队列
this.tsks.Enqueue(task);
上车之前, 还要对任务做一下扩展: 下车的时候, 把你占用的位置释放出来
_tsk.ContinueWith(tt =>
{
//当任务执行完时, 才能阻止队列的一个空间出来,供下一个任务进入
this.block.TryTake(out _);
Interlocked.Increment(ref this._totalExecuted);
//Console.WriteLine($"{DateTime.Now}..................Release");
}, TaskContinuationOptions.AttachedToParent);
帽子戏法
如果一个任务是 Task<Task>
或 Task<Task<T>>
, 你一定要对它 Unwrap
, 否则一眨眼, 你的任务就执行完了, 下车回头一看, 才发现: 我艹, 我包呢?? 你的包(子任务)还在等待着开往春天的地铁呢.
if (task is Task<Task> t)
{
//如果 task 是 task 的嵌套, 需要先解包装
_tsk = t.Unwrap();
}
但是 task is Task<Task<>>
(语法错误) 还是task is Task<object>
(类型不匹配) 呢?? 怎么都不对, 这个时候才发现 C# 太TMD 的不地道了, 这么简单的车, 还需要我在兜一圈.
...
else if (task is IUnwrap tt)
{
_tsk = tt.GetUnwrapped();
}
...
public interface IUnwrap
{
Task GetUnwrapped();
}
public class WrapFuncTask<T> : Task<Task<T>>, IUnwrap
{
...
public Task GetUnwrapped()
{
return this.Unwrap();
}
}
差不多了, 开车
先加个开车提醒:
this.timer = new System.Timers.Timer(period.TotalMilliseconds)
{
AutoReset = true
};
this.timer.Elapsed += Timer_Elapsed;
this.timer.Start();
...
private void Timer_Elapsed(object sender, ElapsedEventArgs e)
{
this._currentCount = 0;
this.PeriodNumber++;
this.TryProcessQueue();
}
开车了....
/// <summary>
///
/// </summary>
private void ProcessQueue()
{
//当 当前计数 小于周期内最大允许的任务数
//且 任务队列中有任务可以取出来
while ((this._currentCount < this.MaxCountPerPeriod)
&& tsks.TryDequeue(out Task tsk))
{
//Console.WriteLine($"{DateTime.Now}..................Dequeue");
Interlocked.Increment(ref this._currentCount);
//执行任务
tsk.Start();
}
}
开车啦
戳: 代码 Gist 地址
网友评论