美文网首页
chromium源码学习——线程池(中)

chromium源码学习——线程池(中)

作者: 丑角的晨歌 | 来源:发表于2018-09-04 21:54 被阅读0次

本篇分析base::SchedulerSequencedTaskRunner实现。
SchedulerSequencedTaskRunner是SequencedTaskRunner的一个主要实现,主要保证了队列中任务的串行执行:前一个任务执行完成后才会进行下一个任务。譬如Profile中的IOTaskRunner就是一个SchedulerSequencedTaskRunner实例。
分析过程中会牵涉到很多个类,这里分析过程我也不知道怎么说起,直接跳到结论吧:


先简单介绍流程中重要的类
base::Sequence 任务的容器,同一个Sequence中的任务保证串行执行,内部存储结构为std::queue。线程安全。
base::PriorityQueue Sequence的容器,内部存储结构为std::priority_queue,可以取出具有最高优先级的sequence。线程安全。
base::PriorityQueue::Transaction 简单封装了一下对PriorityQueue中锁的操作,通过Transaction对PriorityQueue进行操作。保证当存在一个Transaction实例时就没有其他线程可以读写对应的PriorityQueue。
base::SchedulerWorker 线程池中线程的封装
base::SchedulerWorkerPool 明显就是线程池本身咯


SchedulerSequencedTaskRunner的实现位于base/task_scheduler/scheduler_worker_pool_impl.cc,先看一下它的成员:

// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = new Sequence;

const TaskTraits traits_;
SchedulerWorkerPool* const worker_pool_;

很明显,每个SchedulerSequencedTaskRunner会对应一个Sequence实例,从而保证向同一个runner投放的任务之间的串行执行。跟踪一下任务的传递:

bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
    std::unique_ptr<Task> task,
    scoped_refptr<Sequence> sequence) {
  if (!task_tracker_->WillPostTask(task.get()))
    return false;

  if (task->delayed_run_time.is_null()) {
    PostTaskWithSequenceNow(std::move(task), std::move(sequence));
  } else {
    delayed_task_manager_->AddDelayedTask(...);
  }
  return true;
}

void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
    std::unique_ptr<Task> task,
    scoped_refptr<Sequence> sequence) {
  const bool sequence_was_empty = sequence->PushTask(std::move(task));
  if (sequence_was_empty) {
    const auto sequence_sort_key = sequence->GetSortKey();
    shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
                                                    sequence_sort_key);

    // Wake up a worker to process |sequence|.
    WakeUpOneWorker();
  }
}

任务会先经过TaskTracker,TaskTracker内的逻辑还没有细看,但是WillPostTask里面大致就是判断是否正在退出流程,根据任务的traits是否允许执行。 如果任务有延时属性,会向DelayTaskManager中一个线程消息队列投放一个延时任务,时间到后将Task加入Sequence。如果没有延时属性,Task将直接被加入Sequence中,如果Sequence中原本没有任务,那么把Sequence加入Pool中的PriorityQueue,并唤醒一个空闲的worker(如果有的话)。
接下来再看Worker。Worker线程的入口函数位于scheduler_worker.cc:39,这里贴一点重要逻辑的代码如下:

void ThreadMain() override {
  ...

  while (!outer_->ShouldExit()) {
    ...

    // Get the sequence containing the next task to execute.
    scoped_refptr<Sequence> sequence =
        outer_->delegate_->GetWork(outer_.get());
    if (!sequence) {
      if (outer_->delegate_->CanDetach(outer_.get())) {
        detached_thread = outer_->DetachThreadObject(DetachNotify::DELEGATE);
        if (detached_thread) {
          DCHECK_EQ(detached_thread.get(), this);
          PlatformThread::Detach(thread_handle_);
          break;
        }
      }
      outer_->delegate_->WaitForWork(&wake_up_event_);
      continue;
    }

    if (outer_->task_tracker_->RunTask(sequence->TakeTask(),
                                       sequence->token())) {
      outer_->delegate_->DidRunTask();
    }

    const bool sequence_became_empty = sequence->Pop();
    if (!sequence_became_empty)
      outer_->delegate_->ReEnqueueSequence(std::move(sequence));

    ...
  }

  ...
}

scoped_refptr<Sequence>
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
    SchedulerWorker* worker) {
  ...

  scoped_refptr<Sequence> sequence;
  {
    std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
        outer_->shared_priority_queue_.BeginTransaction());

    if (shared_transaction->IsEmpty()) {
      outer_->AddToIdleWorkersStack(worker);
      ...
      return nullptr;
    }

    sequence = shared_transaction->PopSequence();
  }

  outer_->RemoveFromIdleWorkersStack(worker);
  ...
  return sequence;
}

分析ThreadMain可以得到以下信息:

  • 每次循环会从SchedulerWorkerPool中的优先队列获取一个Sequence,并从中弹出一个任务执行
  • 执行完任务后如果Sequence不为空,则将Sequence重新放回WorkerPool中的优先队列
  • 空闲线程的生存由SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach控制,主要影响因素为空闲时间,且Pool中会保留一个空闲线程(不会将所有空闲线程都退出)
  • 执行任务时的Sequence是从优先队列中取出的,执行完任务后才会重新将Sequence加回优先队列,所以保证了不会有两个工作线程持有同一个Sequence的引用,从而保证了Sequence中的任务是串行执行的
    如何保证在多线程的情况下同一队列中的任务串行执行,最直观的方式应该就是执行任务的整个过程中对队列加锁,但是这样会造成执行任务时无法向队列中添加任务,降低了性能。chromium中的做法就避免了这个问题,这个思路可以学习。
    下篇准备写一些初始化、退出流程之类的杂七杂八的。

相关文章

网友评论

      本文标题:chromium源码学习——线程池(中)

      本文链接:https://www.haomeiwen.com/subject/jjziwftx.html