美文网首页
Task+ConcurrentQueue+Parallel多线程

Task+ConcurrentQueue+Parallel多线程

作者: 码嘟嘟 | 来源:发表于2019-12-24 17:14 被阅读0次

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Collections.Concurrent;

    using System.Threading;

    using System.Threading.Tasks;

    using Common.Tool.Net;

    using Common.Tool;

    namespace ConsoleApplication1

    {

        class Program

        {

            static string path = "D://Logs//" + DateTime.Now.ToString("yyyy-MM-dd") + "//";

            static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();

            static void Main(string[] args)

            {

                TaskHelper.GetTaskInitialBasics<int>(path, queue, IntoQueue, ScanQueue);

            }

            private static void IntoQueue(CancellationToken ct)

            {

                Console.WriteLine("入队线程启动");

                List<int> list = new List<int>();

                for (int i = 1; i <= 10000; i++)

                {

                    list.Add(i);

                }

                //使用并行迭代提升速度         

                Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = 500 }, l =>

                {

                    //入队操作

                    queue.Enqueue(l);

                });

                Console.WriteLine("入队线程结束");

                Utils.WriteLog(path, "===========入队线程结束!===========", "Result", false, true);

            }

            private static void ScanQueue(CancellationToken ct)

            {

                ct.ThrowIfCancellationRequested();

                Console.WriteLine("出队线程启动");

                while (true)

                {

                    if (!queue.IsEmpty)

                    {

                        //从队列中取出 

                        int a = 0;

                        if (queue.TryDequeue(out a))

                        {

                            Console.WriteLine("正在录入数据");

                            Utils.WriteLog(path, a.ToString(), "demo", false, true);

                        }

                    }

                    else

                    {

                        Thread.Sleep(1500);

                    }

                    Console.WriteLine("队列剩余总数:" + queue.Count);

                    ct.ThrowIfCancellationRequested();

                }

            }

        }

    }

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading;

    using System.Threading.Tasks;

    using System.Collections.Concurrent;

    namespace Common.Tool.Net

    {

        public class TaskHelper

        {

            public delegate void IntoQueueMethod(CancellationToken token);

            public delegate void ScanQueueMethod(CancellationToken token);

            /// <summary>

            /// task初始化基础设置

            /// </summary>

            /// <typeparam name="T"></typeparam>

            /// <param name="logPath"></param>

            /// <param name="queue"></param>

            /// <param name="MakeIntoQueueMethod"></param>

            /// <param name="MakeScanQueueMethod"></param>

            public static void GetTaskInitialBasics<T>(string logPath, ConcurrentQueue<T> queue, IntoQueueMethod MakeIntoQueueMethod, ScanQueueMethod MakeScanQueueMethod)

            {

                CancellationTokenSource token = new CancellationTokenSource();

                var task = Task.Factory.StartNew(() => MakeIntoQueueMethod(token.Token));

                var task1 = Task.Factory.StartNew(() => MakeScanQueueMethod(token.Token));

                Thread.Sleep(1000);

                if (task.IsFaulted)

                {

                    /*  循环输出异常    */

                    foreach (Exception ex in task.Exception.InnerExceptions)

                    {

                        Console.WriteLine(ex.Message);

                        Utils.WriteLog(logPath, ex.Message, "IntoQueueError", false, true);

                    }

                }

                if (task1.IsFaulted)

                {

                    /*  循环输出异常    */

                    foreach (Exception ex in task1.Exception.InnerExceptions)

                    {

                        Console.WriteLine(ex.Message);

                        Utils.WriteLog(logPath, ex.Message, "ScanQueueError", false, true);

                    }

                }

                Task.WaitAll(task); //等待入队任务结束

                Thread.Sleep(5000);

                while (true)

                {

                    if (queue.Count == 0 && queue.IsEmpty) //队列为空

                    {

                        token.Cancel(); //发送取消任务指令

                        Console.WriteLine("出队线程结束");

                        Utils.WriteLog(logPath, "===========出队线程结束!===========", "Result", false, true);

                        Console.WriteLine("===========全部录入完成!===========");

                        Utils.WriteLog(logPath, "===========全部录入完成!===========", "Result", false, true);

                        break;

                    }

                }

                Thread.Sleep(2000);

                Console.WriteLine("若要结束任务请输入T");

                if (Console.ReadLine() == "T")

                {

                    task.Dispose();

                    task1.Dispose();

                }

            }

        }

    }

    相关文章

      网友评论

          本文标题:Task+ConcurrentQueue+Parallel多线程

          本文链接:https://www.haomeiwen.com/subject/dsyfoctx.html