对于一个大的项目,比如webrtc或者其它公司内项目,如果采用了并发的设计,那线程的模型就非常非常重要了,可以这么说一定程度上决定了项目的成败,而webrtc的线程模型值得深入学习,网上有很多都是基于m85分支甚至更老的分支总结,现依据最近的m115分支总结下,设计思想大体没有太多变化,但是代码实现还是有很多差别。
webrtc中一个TaskQueue就是一个线程,在webrtc中有两种线程封装,一种是rtc:thread(也是基于TaskQueueBase的实现),他包含了某个SockerServer,用户处理网络相关的请求,比如PeerConnection中就有network_thread、worker_thread、signaling_thread;另一种就是TaskQueue,也封装了线程的实现,应用范围也很广,比如音视频的编解码和渲染都有对应的TaskQueue线程,那么这些线程是如何实现的呢?这两种又有什么区别呢?从网上盗了一张图,该图总结了webrtc的线程模型,以供参考,后续会逐步展开对webrtc源码的研究。
image.png
包括三个部分,api层面的TaskQueue接口、具体实现层面的TaskQueue和rtc::Thread
api层面的TaskQueue接口
定义了TaskQueueBase和TaskQueueFactory这个两个最重要虚基类
- TaskQueueBase
实现了PostTask、PostDelayedTask、PostDelayedHighPrecisionTask、PostDelayedTaskWithPrecision,具体的PostxxxxxImpl虚函数由其子类实现,比如rtc_base下的TaskQueueWin
namespace webrtc {
class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
public:
enum class DelayPrecision {
// This may include up to a 17 ms leeway in addition to OS timer precision.
// See PostDelayedTask() for more information.
kLow,
// This does not have the additional delay that kLow has, but it is still
// limited by OS timer precision. See PostDelayedHighPrecisionTask() for
// more information.
kHigh,
};
virtual void Delete() = 0;
void PostTask(absl::AnyInvocable<void() &&> task,
const Location& location = Location::Current()) {
PostTaskImpl(std::move(task), PostTaskTraits{}, location);
}
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const Location& location = Location::Current()) {
PostDelayedTaskImpl(std::move(task), delay,
PostDelayedTaskTraits{.high_precision = false},
location);
}
void PostDelayedHighPrecisionTask(
absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const Location& location = Location::Current()) {
PostDelayedTaskImpl(std::move(task), delay,
PostDelayedTaskTraits{.high_precision = true},
location);
}
// As specified by `precision`, calls either PostDelayedTask() or
// PostDelayedHighPrecisionTask().
void PostDelayedTaskWithPrecision(
DelayPrecision precision,
absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const Location& location = Location::Current()) {
switch (precision) {
case DelayPrecision::kLow:
PostDelayedTask(std::move(task), delay, location);
break;
case DelayPrecision::kHigh:
PostDelayedHighPrecisionTask(std::move(task), delay, location);
break;
}
}
// Returns the task queue that is running the current thread.
// Returns nullptr if this thread is not associated with any task queue.
// May be called on any thread or task queue, including this task queue.
static TaskQueueBase* Current();
bool IsCurrent() const { return Current() == this; }
protected:
// This is currently only present here to simplify introduction of future
// planned task queue changes.
struct PostTaskTraits {};
struct PostDelayedTaskTraits {
// If `high_precision` is false, tasks may execute within up to a 17 ms
// leeway in addition to OS timer precision. Otherwise the task should be
// limited to OS timer precision. See PostDelayedTask() and
// PostDelayedHighPrecisionTask() for more information.
bool high_precision = false;
};
class RTC_EXPORT CurrentTaskQueueSetter {
public:
explicit CurrentTaskQueueSetter(TaskQueueBase* task_queue);
CurrentTaskQueueSetter(const CurrentTaskQueueSetter&) = delete;
CurrentTaskQueueSetter& operator=(const CurrentTaskQueueSetter&) = delete;
~CurrentTaskQueueSetter();
private:
TaskQueueBase* const previous_;
};
// Subclasses should implement this method to support the behavior defined in
// the PostTask and PostTaskTraits docs above.
virtual void PostTaskImpl(absl::AnyInvocable<void() &&> task,
const PostTaskTraits& traits,
const Location& location) = 0;
// Subclasses should implement this method to support the behavior defined in
// the PostDelayedTask/PostHighPrecisionDelayedTask and PostDelayedTaskTraits
// docs above.
virtual void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const PostDelayedTaskTraits& traits,
const Location& location) = 0;
// Users of the TaskQueue should call Delete instead of directly deleting
// this object.
virtual ~TaskQueueBase() = default;
};
struct TaskQueueDeleter {
void operator()(TaskQueueBase* task_queue) const { task_queue->Delete(); }
};
} // namespace webrtc
- TaskQueueFactory
创建TaskQueue的工厂类基类,
namespace webrtc {
// The implementation of this interface must be thread-safe.
class TaskQueueFactory {
public:
// TaskQueue priority levels. On some platforms these will map to thread
// priorities, on others such as Mac and iOS, GCD queue priorities.
enum class Priority { NORMAL = 0, HIGH, LOW };
virtual ~TaskQueueFactory() = default;
virtual std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const = 0;
};
} // namespace webrtc
CreateDefaultTaskQueueFactory接口
在default_task_queue_factory.h文件中,申明了CreateDefaultTaskQueueFactory接口
namespace webrtc {
std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
const FieldTrialsView* field_trials = nullptr);
} // namespace webrtc
在default_task_queue_factory_win.cc、default_task_queue_factory_stdlib.cc、default_task_queue_factory_stdlib_or_libevent_experiment.cc、default_task_queue_factory_libevent.cc、default_task_queue_factory_gcd.cc都实现了CreateDefaultTaskQueueFactory,比如default_task_queue_factory_win.cc中的实现如下,即CreateTaskQueueWinFactory的实现最终在rtc_base\task_queue_win.cc中。
\\ api\default_task_queue_factory_win.cc
namespace webrtc {
std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
const FieldTrialsView* field_trials) {
return CreateTaskQueueWinFactory();
}
} // namespace webrtc
\\ rtc_base\task_queue_win.cc
std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
return std::make_unique<TaskQueueWinFactory>();
}
CreateDefaultTaskQueueFactory接口有多种实现,那到底使用那一个呢?这是在编译期间决定的,在api/task_queue的BUILD.gn中指定了按照编译开关决定使用那个实现,比如Android使用default_task_queue_factory_stdlib_or_libevent_experiment.cc的实现,windows使用default_task_queue_factory_win.cc的实现。
rtc_library("default_task_queue_factory") {
visibility = [ "*" ]
if (!is_ios && !is_android) {
poisonous = [ "default_task_queue" ]
}
sources = [ "default_task_queue_factory.h" ]
deps = [
":task_queue",
"../../api:field_trials_view",
"../../rtc_base/memory:always_valid_pointer",
]
if (rtc_enable_libevent) {
if (is_android) {
sources +=
[ "default_task_queue_factory_stdlib_or_libevent_experiment.cc" ]
deps += [
"../../api/transport:field_trial_based_config",
"../../rtc_base:logging",
"../../rtc_base:rtc_task_queue_libevent",
"../../rtc_base:rtc_task_queue_stdlib",
]
} else {
sources += [ "default_task_queue_factory_libevent.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
}
} else if (is_mac || is_ios) {
sources += [ "default_task_queue_factory_gcd.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_gcd" ]
} else if (is_win && current_os != "winuwp") {
sources += [ "default_task_queue_factory_win.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_win" ]
} else {
sources += [ "default_task_queue_factory_stdlib.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_stdlib" ]
}
}
至此在api层面交代了TaskQueue的架构设计,即编译开关加上不同的工厂创建方法,对外提供了统一的创建TaskQueue的接口,具体接口的实现在rtc_base目录下。
- rtc_base对TaskQueueBase和TaskQueueFactory的实现
- rtc_base/platform_thread.cc
跨平台封装了线程的实现,对于windows调用CreateThread创建线程,对于Linux调用pthread_create创建线程,创建线程的时候可以设置线程的优先级,优先级定义如下:
enum class ThreadPriority {
kLow = 1,
kNormal,
kHigh,
kRealtime,
};
- TaskQueueBase、TaskQueueFactory的子类实现和对应创建TaskQueueFactory实例的接口
不同的实现细节都不同,主要体现在事件循环的机制上。 - task_queue_win.h \ task_queue_win.cc
namespace webrtc {
class TaskQueueWin : public TaskQueueBase {
..
};
class TaskQueueWinFactory : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
}
};
std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
return std::make_unique<TaskQueueWinFactory>();
}
} // namespace webrtc
- task_queue_stdlib.h \ task_queue_stdlib.cc
namespace webrtc {
class TaskQueueStdlib final : public TaskQueueBase {
..
};
class TaskQueueStdlibFactory final : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));
}
};
std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() {
return std::make_unique<TaskQueueStdlibFactory>();
}
} // namespace webrtc
- task_queue_libevent.h \ task_queue_libevent.cc
namespace webrtc {
namespace {
class TaskQueueLibevent final : public TaskQueueBase {
..
};
class TaskQueueLibeventFactory final : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueLibevent(name,
TaskQueuePriorityToThreadPriority(priority)));
}
};
} // namespace
std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
return std::make_unique<TaskQueueLibeventFactory>();
}
- task_queue_gcd.h \ task_queue_gcd.cc
namespace webrtc {
namespace {
class TaskQueueGcd final : public TaskQueueBase {
..
};
class TaskQueueGcdFactory final : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority)));
}
};
} // namespace
std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory() {
return std::make_unique<TaskQueueGcdFactory>();
}
} // namespace webrtc
rtc::Thread
实现见rtc_base\thread.h, rtc_base\thread.cc两个文件。
- rtc::Thread和ThreadManager
除了拥有TaskQueue的PostTask、PostDelayTask方法外还有以下功能特性,这些功能特性方便我们对所有的线程的健康状态进行监控:
- 阻塞调用的接口BlockingCall,当然也可以设置线程禁止阻塞调用;
void BlockingCall(
FunctionView<void()> functor,
const webrtc::Location& location = webrtc::Location::Current()) {
BlockingCallImpl(std::move(functor), location);
}
template <typename Functor,
typename ReturnT = std::invoke_result_t<Functor>,
typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>
ReturnT BlockingCall(
Functor&& functor,
const webrtc::Location& location = webrtc::Location::Current()) {
ReturnT result;
BlockingCall([&] { result = std::forward<Functor>(functor)(); }, location);
return result;
}
- 统计一段时间内当前线程有多少个阻塞调用,并将结果输出到日志中,具体查看RTC_LOG_THREAD_BLOCK_COUNT宏的实现;
- 防止阻塞调用过载,具体查看RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN宏的实现;
- 调试阻塞调用的耗时,具体查看RegisterSendAndCheckForCycles函数的实现,这样可以检测目标线程是否死锁;
- 巡检所有线程是否健康,具体看ProcessAllMessageQueuesInternal
- 统计任务执行的耗时,大于某个阈值则会告警,具体看Dispatch的dispatch_warning_ms_定义
void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
TRACE_EVENT0("webrtc", "Thread::Dispatch");
RTC_DCHECK_RUN_ON(this);
int64_t start_time = TimeMillis();
std::move(task)();
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= dispatch_warning_ms_) {
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
<< "ms to dispatch.";
// To avoid log spew, move the warning limit to only give warning
// for delays that are larger than the one observed.
dispatch_warning_ms_ = diff + 1;
}
}
- 事件循环单次循环的耗时限制,超过限制后进入wait状态,具体看Get函数的实现。
参考文档:
网友评论