Dataflow是啥
Dataflow是由微软提供的一个用于异步或者并发的库。是runtime的一部分但不随runtime分发。需要手动添加nuget包·System.Threading.Tasks.Dataflow·。
Dataflow的工作方式
顾名思义,Dataflow库的工作方式就是按数据流的工作方式工作。将你需要做的事情拆分成到各个步骤,然后把步骤连接起来,就构建好了你的数据流。Dataflow将步骤定义为各种的Block。你只需要关心如何构建Block并进行链接,其他的工作交给Dataflow来完成。你不需要去关心数据具体是怎么在各个Block之间传递和缓存,也不用关心如何去给各个模块分配到线程去执行。当然,如何取消执行Dataflow也帮你搞定了。
假设我们的工作需要四个Block来完成,每个Block需要一秒来完成。那么执行一次需要4秒,顺序执行20次需要80秒。如果交给Dataflow来做,需要多久呢?PS:每个Block默认同时只能存在一个实例运行,你可以自行更改。属性是MaxDegreeOfParallelism
执行示意图如图所示,在第四秒开始,四个Block会同时执行,也就是说,理论上只要24秒就可以完成。当然中间会有额外的开销,比如调度啊,数据传递啊。你可能也注意到,时间减少的程度和模块的数量有关。是的,这是因为默认MaxDegreeOfParallelism = 1。更改这个属性或者添加更多的模块会使执行速度更快,但可能会引入其他的问题。
这里也只是举了个简单的例子。实际使用中,Dataflow可以构建更加复杂的数据流,形成一个网络。在业务更加复杂的情况下,Dataflow 的优势更能体现,即在保证基本性能的情况下,提升开发效率。
Dataflow仅适用于特定情况
以我目前对Dataflow 的了解,我认为Dataflow适用于对性能有一定的追求,但不追求极致,同时对灵活性有一定的要求,开发人员的水平又有限的情况。同时任务又需要有一定的复杂度。
我这里举一个简单例子:
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
Console.WriteLine("Hello, World!");
var bufferBlock = new BufferBlock<int[]>();
var transferBlock = new TransformBlock<int[], double[]>(i => { Thread.Sleep(1000); return i.Select(ii => (double)ii).ToArray(); });
var transformBlock1 = new TransformBlock<double[], int>(d => { Thread.Sleep(1000); return (int)d.Sum(); });
var actionBlock = new ActionBlock<int>(i => { Thread.Sleep(1000); Console.WriteLine(i); });
bufferBlock.LinkTo(transferBlock);
bufferBlock.Completion.ContinueWith(delegate { transferBlock.Complete(); });
transferBlock.LinkTo(transformBlock1);
transferBlock.Completion.ContinueWith(delegate { transformBlock1.Complete(); });
transformBlock1.LinkTo(actionBlock);
transformBlock1.Completion.ContinueWith(delegate { actionBlock.Complete(); });
Stopwatch watch = Stopwatch.StartNew();
for (var i = 0; i < 20; ++ i)
{
bufferBlock.Post(new int[1024 * 100]);
}
bufferBlock.Complete();
await actionBlock.Completion;
watch.Stop();
Console.WriteLine($"Dataflow costs {watch.ElapsedMilliseconds}");
// Output: Dataflow costs 22498
watch.Restart();
Parallel.For(0, 20, i =>
{
var data = new int[1024 * 100];
Thread.Sleep(1000);
var tempData = data.Select(i=>(double)i).ToArray();
Thread.Sleep(1000);
var sum = tempData.Sum();
Thread.Sleep(1000);
Console.WriteLine(sum);
});
watch.Stop();
Console.WriteLine($"Parallel costs {watch.ElapsedMilliseconds}");
// Output: Parallel costs 9247
watch.Restart();
for(int i = 0; i < 20; ++ i)
{
var data = new int[1024 * 100];
Thread.Sleep(1000);
var tempData = data.Select(i=>(double)i).ToArray();
Thread.Sleep(1000);
var sum = tempData.Sum();
Thread.Sleep(1000);
Console.WriteLine(sum);
}
watch.Stop();
Console.WriteLine($"Sync costs {watch.ElapsedMilliseconds}");
// Output: Sync costs 61329
可以看到,在不更改默认配置的情况下,我们使用Parallel会更快。当然你可以跟我较真说,只要稍微修改Dataflow就可以达到同样的性能。但不理解Dataflow的工作模式,去更改设置可能会导致很多多线程的bug出现。理解的人呢,可能又有更好的解决方案,不需要用Dataflow。我猜,这大概就是为啥微软不把这个库随runtime发布的原因吧。
网友评论