美文网首页
{C#} How task works?

{C#} How task works?

作者: windflow | 来源:发表于2016-08-17 14:15 被阅读0次

    .net 从4.5开始对 Task 有了良好的支持。可以很方便的创建任务:

    Task.Factory.StartNew(func<T>);
    

    内部逻辑类似于:

    var task = new Task<T>(func, ...);
    task.ScheduleAndStart(...);
    

    那么当我们写下这行代码时,究竟发生了什么?
    首先,任务在创建时可以指定任务调度器,如果不提供将采用默认的线程池调度器(ThreadPoolTaskScheduler)。当 task 启动的时候,会把自身投递到任务管理器中。

    m_taskScheduler.InternalQueueTask(this);
    
    internal void InternalQueueTask(Task task)
    {
        Contract.Requires(task != null);
    
        task.FireTaskScheduledIfNeeded(this);
    
        this.QueueTask(task);
    }
    

    接着再看 ThreadPoolTaskScheduler.QueueTask(Task):

    protected internal override void QueueTask(Task task)
    {
        if ((task.Options & TaskCreationOptions.LongRunning) != 0)
        {
            // Run LongRunning tasks on their own dedicated thread.
            Thread thread = new Thread(s_longRunningThreadWork);
            thread.IsBackground = true; // Keep this thread from blocking process shutdown
            thread.Start(task);
        }
        else
        {
            // Normal handling for non-LongRunning tasks.
            bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
            ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
        }
    }
    

    ThreadPoolTaskScheduler 其实是对线程池的封装。不过,对于【暗示将要】长时间运行的任务,为避免线程池阻塞,将直接创建新的线程。

    ThreadPool.UnsafeQueueCustomWorkItem:

    internal static void UnsafeQueueCustomWorkItem(IThreadPoolWorkItem workItem, bool forceGlobal)
    {
        Contract.Assert(null != workItem);
        EnsureVMInitialized();
    
        //
        // Enqueue needs to be protected from ThreadAbort
        //
        try { }
        finally
        {
            ThreadPoolGlobals.workQueue.Enqueue(workItem, forceGlobal);
        }
    }
    

    很简单,就是将任务加入到任务队列中。注意 Task,Task<T>,QueueUserWorkItemCallback 都实现了 IThreadPoolWorkItem 接口。

    ThreadPoolGlobals.workQueue.Enqueue:

    public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
    {
        ThreadPoolWorkQueueThreadLocals tl = null;
        if (!forceGlobal)
            tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
    
        if (loggingEnabled)
            System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
        
        if (null != tl)
        {
            tl.workStealingQueue.LocalPush(callback);
        }
        else
        {
            QueueSegment head = queueHead;
    
            while (!head.TryEnqueue(callback))
            {
                Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null);
    
                while (head.Next != null)
                {
                    Interlocked.CompareExchange(ref queueHead, head.Next, head);
                    head = queueHead;
                }
            }
        }
    
        EnsureThreadRequested();
    }
    

    注意这段代码里的细节。

    1. 如果 Task 是一个 Top Task(从主线程或者其他线程创建的),ThreadPoolWorkQueueThreadLocals(ThreadStatic) 不会被初始化,Task 将
      直接投递到全局队列中。
    2. 如果 TaskB 是在 TaskA 中创建的,TaskB 会被插入到 TaskA 当前线程的工作队列。因为在 TaskA 的当前线程中,ThreadPoolWorkQueueThreadLocals 已经建立(后面会讲到为什么)。
    3. EnsureThreadRequested 会请求必要的线程(如果没有超过最大可用线程)。具体创建线程的代码不可见。

    插入队列的过程就到这里结束了。 可是,,我们的task什么时候能跑起来?
    这就涉及到另一方面,线程是怎么从队列里获取任务的。线程大概会执行ThreadPool.Dispatch,在这个方法中首先会创建自身的队列,然后获取任务并执行。
    获取任务的算法比较有意思,这里贴出重要的部分。
    ThreadPoolGlobals.workQueue.Dequeue(tl,...);

    public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal)
    {
        callback = null;
        missedSteal = false;
        WorkStealingQueue wsq = tl.workStealingQueue;
    
        if (wsq.LocalPop(out callback))
            Contract.Assert(null != callback);
    
        if (null == callback)
        {
            QueueSegment tail = queueTail;
            while (true)
            {
                if (tail.TryDequeue(out callback))
                {
                    Contract.Assert(null != callback);
                    break;
                }
    
                if (null == tail.Next || !tail.IsUsedUp())
                {
                    break;
                }
                else
                {
                    Interlocked.CompareExchange(ref queueTail, tail.Next, tail);
                    tail = queueTail;
                }
            }
        }
    
        if (null == callback)
        {
            WorkStealingQueue[] otherQueues = allThreadQueues.Current;
            int i = tl.random.Next(otherQueues.Length);
            int c = otherQueues.Length;
            while (c > 0)
            {
                WorkStealingQueue otherQueue = Volatile.Read(ref otherQueues[i % otherQueues.Length]);
                if (otherQueue != null &&
                    otherQueue != wsq &&
                    otherQueue.TrySteal(out callback, ref missedSteal))
                {
                    Contract.Assert(null != callback);
                    break;
                }
                i++;
                c--;
            }
        }
    }
    

    线程先从本身队列里获取任务,然后从全局队列获取,最后从其他线程的队列里偷任务。

    值得注意的是,全局队列是FIFO,本地队列是LIFO。两者都采用了轻量锁机制,尤其是全局队列,设计得非常巧妙。

    接下来说说TaskA中创建TaskB的情况。TaskB在创建时如果没有特别指定TaskScheduler,将使用TaskA的TaskScheduler。记住,是在TaskA运行中的线程启动TaskB,而此线程在分发之时就已经创建了本地队列,根据ThreadPoolGlobals.workQueue.Enqueue算法,可以知道TaskB会被插入到本地队列。所以,你应该知道如何利用Task这一特性来有针对性的创建和启动task。

    相关文章

      网友评论

          本文标题:{C#} How task works?

          本文链接:https://www.haomeiwen.com/subject/vnumsttx.html