本篇分析base::SchedulerSequencedTaskRunner实现。
SchedulerSequencedTaskRunner是SequencedTaskRunner的一个主要实现,主要保证了队列中任务的串行执行:前一个任务执行完成后才会进行下一个任务。譬如Profile中的IOTaskRunner就是一个SchedulerSequencedTaskRunner实例。
分析过程中会牵涉到很多个类,这里分析过程我也不知道怎么说起,直接跳到结论吧:
先简单介绍流程中重要的类
base::Sequence 任务的容器,同一个Sequence中的任务保证串行执行,内部存储结构为std::queue。线程安全。
base::PriorityQueue Sequence的容器,内部存储结构为std::priority_queue,可以取出具有最高优先级的sequence。线程安全。
base::PriorityQueue::Transaction 简单封装了一下对PriorityQueue中锁的操作,通过Transaction对PriorityQueue进行操作。保证当存在一个Transaction实例时就没有其他线程可以读写对应的PriorityQueue。
base::SchedulerWorker 线程池中线程的封装
base::SchedulerWorkerPool 明显就是线程池本身咯
SchedulerSequencedTaskRunner的实现位于base/task_scheduler/scheduler_worker_pool_impl.cc,先看一下它的成员:
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = new Sequence;
const TaskTraits traits_;
SchedulerWorkerPool* const worker_pool_;
很明显,每个SchedulerSequencedTaskRunner会对应一个Sequence实例,从而保证向同一个runner投放的任务之间的串行执行。跟踪一下任务的传递:
bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
std::unique_ptr<Task> task,
scoped_refptr<Sequence> sequence) {
if (!task_tracker_->WillPostTask(task.get()))
return false;
if (task->delayed_run_time.is_null()) {
PostTaskWithSequenceNow(std::move(task), std::move(sequence));
} else {
delayed_task_manager_->AddDelayedTask(...);
}
return true;
}
void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
std::unique_ptr<Task> task,
scoped_refptr<Sequence> sequence) {
const bool sequence_was_empty = sequence->PushTask(std::move(task));
if (sequence_was_empty) {
const auto sequence_sort_key = sequence->GetSortKey();
shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
sequence_sort_key);
// Wake up a worker to process |sequence|.
WakeUpOneWorker();
}
}
任务会先经过TaskTracker,TaskTracker内的逻辑还没有细看,但是WillPostTask里面大致就是判断是否正在退出流程,根据任务的traits是否允许执行。 如果任务有延时属性,会向DelayTaskManager中一个线程消息队列投放一个延时任务,时间到后将Task加入Sequence。如果没有延时属性,Task将直接被加入Sequence中,如果Sequence中原本没有任务,那么把Sequence加入Pool中的PriorityQueue,并唤醒一个空闲的worker(如果有的话)。
接下来再看Worker。Worker线程的入口函数位于scheduler_worker.cc:39,这里贴一点重要逻辑的代码如下:
void ThreadMain() override {
...
while (!outer_->ShouldExit()) {
...
// Get the sequence containing the next task to execute.
scoped_refptr<Sequence> sequence =
outer_->delegate_->GetWork(outer_.get());
if (!sequence) {
if (outer_->delegate_->CanDetach(outer_.get())) {
detached_thread = outer_->DetachThreadObject(DetachNotify::DELEGATE);
if (detached_thread) {
DCHECK_EQ(detached_thread.get(), this);
PlatformThread::Detach(thread_handle_);
break;
}
}
outer_->delegate_->WaitForWork(&wake_up_event_);
continue;
}
if (outer_->task_tracker_->RunTask(sequence->TakeTask(),
sequence->token())) {
outer_->delegate_->DidRunTask();
}
const bool sequence_became_empty = sequence->Pop();
if (!sequence_became_empty)
outer_->delegate_->ReEnqueueSequence(std::move(sequence));
...
}
...
}
scoped_refptr<Sequence>
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
SchedulerWorker* worker) {
...
scoped_refptr<Sequence> sequence;
{
std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
outer_->shared_priority_queue_.BeginTransaction());
if (shared_transaction->IsEmpty()) {
outer_->AddToIdleWorkersStack(worker);
...
return nullptr;
}
sequence = shared_transaction->PopSequence();
}
outer_->RemoveFromIdleWorkersStack(worker);
...
return sequence;
}
分析ThreadMain可以得到以下信息:
- 每次循环会从SchedulerWorkerPool中的优先队列获取一个Sequence,并从中弹出一个任务执行
- 执行完任务后如果Sequence不为空,则将Sequence重新放回WorkerPool中的优先队列
- 空闲线程的生存由SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach控制,主要影响因素为空闲时间,且Pool中会保留一个空闲线程(不会将所有空闲线程都退出)
- 执行任务时的Sequence是从优先队列中取出的,执行完任务后才会重新将Sequence加回优先队列,所以保证了不会有两个工作线程持有同一个Sequence的引用,从而保证了Sequence中的任务是串行执行的
如何保证在多线程的情况下同一队列中的任务串行执行,最直观的方式应该就是执行任务的整个过程中对队列加锁,但是这样会造成执行任务时无法向队列中添加任务,降低了性能。chromium中的做法就避免了这个问题,这个思路可以学习。
下篇准备写一些初始化、退出流程之类的杂七杂八的。
网友评论