美文网首页.NET看场电影ndk
64行C# 代码实现异步队列

64行C# 代码实现异步队列

作者: 冰麟轻武 | 来源:发表于2018-06-03 16:40 被阅读353次

    不喜欢看废话的同学直接跳到 看代码

    一、

    有时候我们会有这样的需求:
    一个或多个线程(Senders)向一个队列(FIFO)中写入数据,
    另外一个或多个线程(Receivers)从这个队列中取数据,并对数据进行处理或加工
    这就是异步队列


    图片来自网络

    PS:发送者(sender)/接收者(receiver)有时也被叫做生产者(producer)/消费者(consumer )

    二、

    最近在项目中有使用(本地)异步队列的需求,在网上搜了一圈,都不是很满意:(所以说.NET生态还有待加强)
    一种是通过事件机制来触发,这种方式写出的代码比较“分散”,不易阅读,理解和维护,这种当然不能接受啦,
    另一种是通过“阻塞”模式来优化代码的可读性,代价就是浪费性能,
    拜托,现在都什么年代了,怎么可能阻塞线程呢?当然是使用 C# 5.0 引入的 async/await啦。
    因为搜不到,所以只能自己动手了

    三、

    我们的目标当然是写出这样的代码:

    var x = await queue.Dequeue(cancellationToken);
    

    并且内部的实现必须是非阻塞式的,
    基于这个目标我们需要知道一个知识点信号量

    四、

    信号量简单来说就是对一个资源打上一个数字的标记,
    这个数字(正数)表示了这个资源可以同时被多少个对象访问,(负数)还有多少个对象需要访问他
    打个比方:一支笔,他同时只能被一个人使用,所以我可以初始给他打上一个信号量1
    当第一个小朋友来借笔时,首先观察信号量1(大于0),则表示可以将笔(资源)借(分配)给小朋友(对象),并将信号量-1,此时信号量为0
    第二个小朋友来借笔时,信号量为0,表示需要等待,并将信号量-1,此时信号量为-1(表示有1个对象正在等待资源释放)
    如果这时,第一个小朋友,将笔(资源)归还(释放),则将信号量+1,并将笔借给第二个小朋友,此时信号量为0(表示无等待)
    如果在第一个小朋友还没有将笔归还之前,第二个小朋友表示不愿意再等了,则信号量也-1

    例子2:
    一个小游泳池,可以同时允许10个人一起下水,则初始信号量为10
    第一个人来,信号量-1,得到9,大于等于0,表示可以进去玩
    第二人人来,信号量-1,得到8,大于等于0,表示可以进去玩
    ......
    第十个人来,信号量-1,得到0,大于等于0,表示可以进去玩
    第十一个人来,信号量-1,得到-1,小于0,表示需要等待
    第十二个人来,信号量-1,得到-2,小于0,表示需要等待
    第十三个人来,信号量-1,得到-3,小于0,表示需要等待
    第一个人走了,信号量+1,将第十个人放进去,信号量等于-2,有2个人在等待
    第十二个人走了,信号量+1,信号量等于-1,有1个人在等待

    与信号量的处理相关的还有一个PV操作,了解一下

    五、

    在C#中有专门用于解决信号量问题的类:SemaphoreSemaphoreSlim

    Semaphore:限制可同时访问某一资源或资源池的线程数。
    SemaphoreSlim:对可同时访问资源或资源池的线程数加以限制的 System.Threading.Semaphore 的轻量替代。

    这里我选择更轻量的SemaphoreSlim来实现,他的用法也非常简单

    var s = new SemaphoreSlim(1);         // 计数器初始值1
    await s.WaitAsync(cancellationToken); // 计数器-1,如果计数不足则等待(这个类的设计是计数到0就不会再减少了)
    s.Release();                          // 计数器+1
    

    下面就开始实现一个异步队列

    六、

    先定义一个异步队列的接口

    // 异步队列接口
    public interface IAsyncQueue<T>: IDisposable
    {
        // 清空队列。
        Task Clear(CancellationToken token);
        // 移除并返回位于队列开始处的对象。
        Task<T> Dequeue(CancellationToken token);
        // 将对象添加到队列的结尾处。
        Task Enqueue(T item, CancellationToken token);
    }
    

    定义接口的好处是为了方便写扩展方法和以后对实现的修改

    七、

    定义信号量
    从接口中可以看出,入和出2个操作都是异步的,所以需要定义2个信号量

    private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
    private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
    

    入操作的信号量初始值是1,表示允许1个并发执行
    出操作的信号量初始值是0,因为出操作的信号量是根据队列中的元素个数来决定的,初始队列元素个数为0

    定义一个内部队列,用于实现队列的基本操作

    private readonly Queue<T> _queue = new Queue<T>();
    

    实现类定义:

    // 异步消息队列实现
    sealed class AsyncQueue<T> : IAsyncQueue<T>
    {
        // 内部队列实例
        private readonly Queue<T> _queue = new Queue<T>();
        // 入操作信号量
        private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
        // 出操作信号量
        private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
    
        public Task Clear(CancellationToken token) => throw new NotImplementedException();
        public Task<T> Dequeue(CancellationToken token) => throw new NotImplementedException();
        public Task Enqueue(T item, CancellationToken token) => throw new NotImplementedException();
        public void Dispose() => throw new NotImplementedException();
    }
    

    八、

    入(Enqueue)操作

    public async Task Enqueue(T item, CancellationToken token)
    {
        await _in.WaitAsync(token); // 入操作信号量-1,并发时等待,只允许一个线程操作
        try
        {
            _queue.Enqueue(item);   // 将对象放入队列
            _out.Release();         // “出”操作信号量+1
        }
        finally
        {
            _in.Release();          // 如果Wait操作完成,则必须将信号量施放
        }
    }
    

    出(Dequeue)操作

    public async Task<T> Dequeue(CancellationToken token)
    {
        await _out.WaitAsync(token);  // 同上,出操作比较简单就不赘述了
        return _queue.Dequeue();
    }
    

    清空(Clear)操作

    public async Task Clear(CancellationToken token)
    {
        await _in.WaitAsync(token);  // 先占中入操作的资源,防止操作中插入新的对象
        try
        {
            // 循环调用出操作的Wait,将信号量减为0
            // WaitAsync(100)表示每次操作等待100毫秒,为了防止另一个线程将`_out`的最后一个资源抢先领取后,清空操作无限期等待
            while (await _out.WaitAsync(100) || _out.CurrentCount > 0) 
            {
            }
            _queue.Clear();
        }
        finally
        {
            _in.Release();
        }
    }
    

    九、
    完整代码:

    using System.Collections.Concurrent;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace blqw
    {
        sealed class AsyncQueue<T> : IAsyncQueue<T>
        {
            private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
            private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
            private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
    
            public async Task Clear(CancellationToken token)
            {
                await _in.WaitAsync(token);
                try
                {
                    while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
                    {
                        _queue.TryDequeue(out _);
                    }
                }
                finally
                {
                    _in.Release();
                }
            }
    
            public async Task<T> Dequeue(CancellationToken token)
            {
                await _out.WaitAsync(token);
                return _queue.TryDequeue(out var val) ? val : throw new System.InvalidOperationException();
            }
    
            public async Task Enqueue(T item, CancellationToken token)
            {
                await _in.WaitAsync(token);
                try
                {
                    _queue.Enqueue(item);
                    _out.Release();
                }
                finally
                {
                    _in.Release();
                }
            }
    
            void DisposeSemaphoreSlim(SemaphoreSlim ss)
            {
                try
                {
                    ss.Dispose();
                }
                catch { }
            }
    
            public void Dispose()
            {
                DisposeSemaphoreSlim(_in);
                DisposeSemaphoreSlim(_out);
            }
        }
    }
    

    64行

    十、

    工厂类

    /// <summary>
    /// 异步队列
    /// </summary>
    public static class AsyncQueue
    {
        public static IAsyncQueue<T> Create<T>() => new AsyncQueue<T>();
    }
    

    不直接公开 AsyncQueue<T> 是考虑到以后方便替换实现类

    拓展类

    public static class AsyncQueueExtensions
    {
        public static Task Clear<T>(this IAsyncQueue<T> aq) => aq.Clear(CancellationToken.None);
    
        public static Task Clear<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
        {
            var source = new CancellationTokenSource(millisecondsTimeout);
            return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
        }
    
        public static Task Clear<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
        {
            var source = new CancellationTokenSource(timeout);
            return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
        }
    
        public static Task<T> Dequeue<T>(this IAsyncQueue<T> aq) => aq.Dequeue(CancellationToken.None);
    
        public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
        {
            using (var source = new CancellationTokenSource(millisecondsTimeout))
            {
                return await aq.Dequeue(source.Token);
            }
        }
    
        public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
        {
            using (var source = new CancellationTokenSource(timeout))
            {
                return await aq.Dequeue(source.Token);
            }
        }
    
        public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item) => aq.Enqueue(item, CancellationToken.None);
    
        public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, int millisecondsTimeout)
        {
            var source = new CancellationTokenSource(millisecondsTimeout);
            return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
        }
    
        public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, TimeSpan timeout)
        {
            var source = new CancellationTokenSource(timeout);
            return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
        }
    
    
        public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items)
        {
            if (items != null)
            {
                foreach (var item in items)
                {
                    await aq.Enqueue(item, CancellationToken.None);
                }
            }
        }
    
        public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, int millisecondsTimeout)
        {
            if (items != null)
            {
                using (var source = new CancellationTokenSource(millisecondsTimeout))
                {
                    foreach (var item in items)
                    {
                        await aq.Enqueue(item, CancellationToken.None);
                    }
                }
            }
        }
    
        public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, TimeSpan timeout)
        {
            if (items != null)
            {
                using (var source = new CancellationTokenSource(timeout))
                {
                    foreach (var item in items)
                    {
                        await aq.Enqueue(item, CancellationToken.None);
                    }
                }
            }
        }
    }
    

    十一、
    现在来测试一下
    为了方便观察测试结果,先写一个将结果改为彩色的类,并且是异步的,不影响测试代码

    static class ColorConsole
    {
        public static void WriteLine(string value, ConsoleColor? backgroundColor = null, ConsoleColor? foregroundColor = null)
        {
            Task.Run(() =>
            {
                lock (typeof(Console))
                {
                    Console.ResetColor();
                    if (backgroundColor != null)
                    {
                        Console.BackgroundColor = backgroundColor.Value;
                    }
                    if (foregroundColor != null)
                    {
                        Console.ForegroundColor = foregroundColor.Value;
                    }
                    Console.WriteLine(value);
                }
            });
        }
    }
    

    发送者:

    class Sender
    {
        private readonly int _index;
        private readonly IAsyncQueue<string> _queue;
        private readonly ConsoleColor _background;
    
        public Sender(int index, IAsyncQueue<string> queue, ConsoleColor background)
        {
            _index = index;
            _queue = queue ?? throw new ArgumentNullException(nameof(queue));
            _background = background;
        }
    
        public async Task Send(string message)
        {
            ColorConsole.WriteLine($"{_index}号发送者写入{message}", backgroundColor: _background);
            await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟模拟实际场景
            await _queue.Enqueue(message);  // 关键代码
        }
    }
    

    接收者

    class Receiver
    {
        private readonly int _index;
        private readonly IAsyncQueue<string> _queue;
        private readonly ConsoleColor _foreground;
    
        public Receiver(int index, IAsyncQueue<string> queue, ConsoleColor foreground)
        {
            _index = index;
            _queue = queue ?? throw new ArgumentNullException(nameof(queue));
            _foreground = foreground;
        }
    
        public async Task Receive(CancellationToken token)
        {
            try
            {
                while (true)
                {
                    var str = await _queue.Dequeue(token); // 关键代码
                    ColorConsole.WriteLine($"{_index}号接收者获取到:{str}", foregroundColor: _foreground);
                    await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟,模拟实际场景
                }
            }
            catch (OperationCanceledException)
            {
                ColorConsole.WriteLine($"{_index}号接收者关闭", foregroundColor: _foreground);
            }
        }
    }
    

    测试类

    static void Main(string[] args)
    {
        var queue = AsyncQueue.Create<string>(); // 初始化异步队列
        var source = new CancellationTokenSource(); // 初始化取消标志
        var token = source.Token; 
        var senders = Enumerable.Range(0, 3).Select(index => new Sender(index, queue, (ConsoleColor)(index+13))).ToArray(); // 初始化3个发送者
        var receivers = Enumerable.Range(0, 10).Select(index => new Receiver(index, queue, (ConsoleColor)(index + 5))).ToArray(); // 初始化10个接收者
    
        Parallel.ForEach(receivers, async x => await x.Receive(token)); // 并行启动10个接收者
    
        Thread.Sleep(1000); // 延迟1秒 等待接收者全部启动完成
        var message = 0;
        // 并行启动3个发送者,每个发送者发送10次,发送内容为从1开始自增的整型数字,也就是1~30
        Parallel.ForEach(senders, async x =>
        {
            for (var i = 0; i < 10; i++)
            {
                await x.Send(Interlocked.Increment(ref message).ToString());
            }
        });
    
        Console.ReadLine();
        source.Cancel(); // 停止所有接收者
        Console.ReadLine();
    }
    

    十二、

    由于整个过程都是异步的,所以打印结果并不会是顺序的

    运行效果

    十三、

    github
    nuget

    十四、

    如果可以帮到你,别忘了帮我点一下喜欢,让更多的人看到


    相关文章

      网友评论

      本文标题:64行C# 代码实现异步队列

      本文链接:https://www.haomeiwen.com/subject/jyrdsftx.html