.net 从4.5开始对 Task 有了良好的支持。可以很方便的创建任务:
Task.Factory.StartNew(func<T>);
内部逻辑类似于:
var task = new Task<T>(func, ...);
task.ScheduleAndStart(...);
那么当我们写下这行代码时,究竟发生了什么?
首先,任务在创建时可以指定任务调度器,如果不提供将采用默认的线程池调度器(ThreadPoolTaskScheduler)。当 task 启动的时候,会把自身投递到任务管理器中。
m_taskScheduler.InternalQueueTask(this);
internal void InternalQueueTask(Task task)
{
Contract.Requires(task != null);
task.FireTaskScheduledIfNeeded(this);
this.QueueTask(task);
}
接着再看 ThreadPoolTaskScheduler.QueueTask(Task):
protected internal override void QueueTask(Task task)
{
if ((task.Options & TaskCreationOptions.LongRunning) != 0)
{
// Run LongRunning tasks on their own dedicated thread.
Thread thread = new Thread(s_longRunningThreadWork);
thread.IsBackground = true; // Keep this thread from blocking process shutdown
thread.Start(task);
}
else
{
// Normal handling for non-LongRunning tasks.
bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
}
}
ThreadPoolTaskScheduler 其实是对线程池的封装。不过,对于【暗示将要】长时间运行的任务,为避免线程池阻塞,将直接创建新的线程。
ThreadPool.UnsafeQueueCustomWorkItem:
internal static void UnsafeQueueCustomWorkItem(IThreadPoolWorkItem workItem, bool forceGlobal)
{
Contract.Assert(null != workItem);
EnsureVMInitialized();
//
// Enqueue needs to be protected from ThreadAbort
//
try { }
finally
{
ThreadPoolGlobals.workQueue.Enqueue(workItem, forceGlobal);
}
}
很简单,就是将任务加入到任务队列中。注意 Task,Task<T>,QueueUserWorkItemCallback 都实现了 IThreadPoolWorkItem 接口。
ThreadPoolGlobals.workQueue.Enqueue:
public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
{
ThreadPoolWorkQueueThreadLocals tl = null;
if (!forceGlobal)
tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
if (loggingEnabled)
System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
if (null != tl)
{
tl.workStealingQueue.LocalPush(callback);
}
else
{
QueueSegment head = queueHead;
while (!head.TryEnqueue(callback))
{
Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null);
while (head.Next != null)
{
Interlocked.CompareExchange(ref queueHead, head.Next, head);
head = queueHead;
}
}
}
EnsureThreadRequested();
}
注意这段代码里的细节。
- 如果 Task 是一个 Top Task(从主线程或者其他线程创建的),ThreadPoolWorkQueueThreadLocals(ThreadStatic) 不会被初始化,Task 将
直接投递到全局队列中。 - 如果 TaskB 是在 TaskA 中创建的,TaskB 会被插入到 TaskA 当前线程的工作队列。因为在 TaskA 的当前线程中,ThreadPoolWorkQueueThreadLocals 已经建立(后面会讲到为什么)。
- EnsureThreadRequested 会请求必要的线程(如果没有超过最大可用线程)。具体创建线程的代码不可见。
插入队列的过程就到这里结束了。 可是,,我们的task什么时候能跑起来?
这就涉及到另一方面,线程是怎么从队列里获取任务的。线程大概会执行ThreadPool.Dispatch,在这个方法中首先会创建自身的队列,然后获取任务并执行。
获取任务的算法比较有意思,这里贴出重要的部分。
ThreadPoolGlobals.workQueue.Dequeue(tl,...);
public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal)
{
callback = null;
missedSteal = false;
WorkStealingQueue wsq = tl.workStealingQueue;
if (wsq.LocalPop(out callback))
Contract.Assert(null != callback);
if (null == callback)
{
QueueSegment tail = queueTail;
while (true)
{
if (tail.TryDequeue(out callback))
{
Contract.Assert(null != callback);
break;
}
if (null == tail.Next || !tail.IsUsedUp())
{
break;
}
else
{
Interlocked.CompareExchange(ref queueTail, tail.Next, tail);
tail = queueTail;
}
}
}
if (null == callback)
{
WorkStealingQueue[] otherQueues = allThreadQueues.Current;
int i = tl.random.Next(otherQueues.Length);
int c = otherQueues.Length;
while (c > 0)
{
WorkStealingQueue otherQueue = Volatile.Read(ref otherQueues[i % otherQueues.Length]);
if (otherQueue != null &&
otherQueue != wsq &&
otherQueue.TrySteal(out callback, ref missedSteal))
{
Contract.Assert(null != callback);
break;
}
i++;
c--;
}
}
}
线程先从本身队列里获取任务,然后从全局队列获取,最后从其他线程的队列里偷任务。
值得注意的是,全局队列是FIFO,本地队列是LIFO。两者都采用了轻量锁机制,尤其是全局队列,设计得非常巧妙。
接下来说说TaskA中创建TaskB的情况。TaskB在创建时如果没有特别指定TaskScheduler,将使用TaskA的TaskScheduler。记住,是在TaskA运行中的线程启动TaskB,而此线程在分发之时就已经创建了本地队列,根据ThreadPoolGlobals.workQueue.Enqueue算法,可以知道TaskB会被插入到本地队列。所以,你应该知道如何利用Task这一特性来有针对性的创建和启动task。
网友评论