美文网首页
webrtc的线程模块设计的源码分析

webrtc的线程模块设计的源码分析

作者: 耐寒 | 来源:发表于2023-09-06 10:25 被阅读0次

对于一个大的项目,比如webrtc或者其它公司内项目,如果采用了并发的设计,那线程的模型就非常非常重要了,可以这么说一定程度上决定了项目的成败,而webrtc的线程模型值得深入学习,网上有很多都是基于m85分支甚至更老的分支总结,现依据最近的m115分支总结下,设计思想大体没有太多变化,但是代码实现还是有很多差别。
webrtc中一个TaskQueue就是一个线程,在webrtc中有两种线程封装,一种是rtc:thread(也是基于TaskQueueBase的实现),他包含了某个SockerServer,用户处理网络相关的请求,比如PeerConnection中就有network_thread、worker_thread、signaling_thread;另一种就是TaskQueue,也封装了线程的实现,应用范围也很广,比如音视频的编解码和渲染都有对应的TaskQueue线程,那么这些线程是如何实现的呢?这两种又有什么区别呢?从网上盗了一张图,该图总结了webrtc的线程模型,以供参考,后续会逐步展开对webrtc源码的研究。


image.png

包括三个部分,api层面的TaskQueue接口、具体实现层面的TaskQueue和rtc::Thread

api层面的TaskQueue接口
定义了TaskQueueBase和TaskQueueFactory这个两个最重要虚基类

Image.png
  • TaskQueueBase
    实现了PostTask、PostDelayedTask、PostDelayedHighPrecisionTask、PostDelayedTaskWithPrecision,具体的PostxxxxxImpl虚函数由其子类实现,比如rtc_base下的TaskQueueWin
namespace webrtc {

class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
public:
  enum class DelayPrecision {
    // This may include up to a 17 ms leeway in addition to OS timer precision.
    // See PostDelayedTask() for more information.
    kLow,

    // This does not have the additional delay that kLow has, but it is still
    // limited by OS timer precision. See PostDelayedHighPrecisionTask() for
    // more information.
    kHigh,
  };

  virtual void Delete() = 0;
  void PostTask(absl::AnyInvocable<void() &&> task,
         const Location& location = Location::Current()) {
    PostTaskImpl(std::move(task), PostTaskTraits{}, location);
  }

  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
                       TimeDelta delay,
                       const Location& location = Location::Current()) {
    PostDelayedTaskImpl(std::move(task), delay,
                        PostDelayedTaskTraits{.high_precision = false},
                        location);
  }

  void PostDelayedHighPrecisionTask(
      absl::AnyInvocable<void() &&> task,
      TimeDelta delay,
      const Location& location = Location::Current()) {

    PostDelayedTaskImpl(std::move(task), delay,
                        PostDelayedTaskTraits{.high_precision = true},
                        location);
  }

  // As specified by `precision`, calls either PostDelayedTask() or
  // PostDelayedHighPrecisionTask().
  void PostDelayedTaskWithPrecision(
      DelayPrecision precision,
      absl::AnyInvocable<void() &&> task,
      TimeDelta delay,
      const Location& location = Location::Current()) {

    switch (precision) {
      case DelayPrecision::kLow:
        PostDelayedTask(std::move(task), delay, location);
        break;

      case DelayPrecision::kHigh:
        PostDelayedHighPrecisionTask(std::move(task), delay, location);
        break;
    }
  }

  // Returns the task queue that is running the current thread.
  // Returns nullptr if this thread is not associated with any task queue.
  // May be called on any thread or task queue, including this task queue.

  static TaskQueueBase* Current();
  bool IsCurrent() const { return Current() == this; }

protected:
  // This is currently only present here to simplify introduction of future
  // planned task queue changes.
  struct PostTaskTraits {};

  struct PostDelayedTaskTraits {
    // If `high_precision` is false, tasks may execute within up to a 17 ms
    // leeway in addition to OS timer precision. Otherwise the task should be
    // limited to OS timer precision. See PostDelayedTask() and
    // PostDelayedHighPrecisionTask() for more information.

    bool high_precision = false;
  };

  class RTC_EXPORT CurrentTaskQueueSetter {
   public:
    explicit CurrentTaskQueueSetter(TaskQueueBase* task_queue);
    CurrentTaskQueueSetter(const CurrentTaskQueueSetter&) = delete;
    CurrentTaskQueueSetter& operator=(const CurrentTaskQueueSetter&) = delete;
    ~CurrentTaskQueueSetter();

   private:
    TaskQueueBase* const previous_;
  };

  // Subclasses should implement this method to support the behavior defined in
  // the PostTask and PostTaskTraits docs above.
  virtual void PostTaskImpl(absl::AnyInvocable<void() &&> task,
                            const PostTaskTraits& traits,
                            const Location& location) = 0;

  // Subclasses should implement this method to support the behavior defined in
  // the PostDelayedTask/PostHighPrecisionDelayedTask and PostDelayedTaskTraits
  // docs above.

  virtual void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
                                   TimeDelta delay,
                                   const PostDelayedTaskTraits& traits,
                                   const Location& location) = 0;

  // Users of the TaskQueue should call Delete instead of directly deleting
  // this object.

  virtual ~TaskQueueBase() = default;
};

struct TaskQueueDeleter {
  void operator()(TaskQueueBase* task_queue) const { task_queue->Delete(); }
};

}  // namespace webrtc

  • TaskQueueFactory
    创建TaskQueue的工厂类基类,
namespace webrtc {

// The implementation of this interface must be thread-safe.
class TaskQueueFactory {

public:
  // TaskQueue priority levels. On some platforms these will map to thread
  // priorities, on others such as Mac and iOS, GCD queue priorities.
  enum class Priority { NORMAL = 0, HIGH, LOW };

  virtual ~TaskQueueFactory() = default;
  virtual std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const = 0;
};

}  // namespace webrtc

CreateDefaultTaskQueueFactory接口
在default_task_queue_factory.h文件中,申明了CreateDefaultTaskQueueFactory接口

namespace webrtc {

std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
    const FieldTrialsView* field_trials = nullptr);
}  // namespace webrtc

在default_task_queue_factory_win.cc、default_task_queue_factory_stdlib.cc、default_task_queue_factory_stdlib_or_libevent_experiment.cc、default_task_queue_factory_libevent.cc、default_task_queue_factory_gcd.cc都实现了CreateDefaultTaskQueueFactory,比如default_task_queue_factory_win.cc中的实现如下,即CreateTaskQueueWinFactory的实现最终在rtc_base\task_queue_win.cc中。

\\ api\default_task_queue_factory_win.cc

namespace webrtc {

std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
    const FieldTrialsView* field_trials) {
  return CreateTaskQueueWinFactory();
}

}  // namespace webrtc

\\ rtc_base\task_queue_win.cc

std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
  return std::make_unique<TaskQueueWinFactory>();
}

CreateDefaultTaskQueueFactory接口有多种实现,那到底使用那一个呢?这是在编译期间决定的,在api/task_queue的BUILD.gn中指定了按照编译开关决定使用那个实现,比如Android使用default_task_queue_factory_stdlib_or_libevent_experiment.cc的实现,windows使用default_task_queue_factory_win.cc的实现。

rtc_library("default_task_queue_factory") {
  visibility = [ "*" ]
  if (!is_ios && !is_android) {
    poisonous = [ "default_task_queue" ]
  }

  sources = [ "default_task_queue_factory.h" ]
  deps = [
    ":task_queue",
    "../../api:field_trials_view",
    "../../rtc_base/memory:always_valid_pointer",
  ]

  if (rtc_enable_libevent) {
    if (is_android) {
      sources +=
          [ "default_task_queue_factory_stdlib_or_libevent_experiment.cc" ]
      deps += [
        "../../api/transport:field_trial_based_config",
        "../../rtc_base:logging",
        "../../rtc_base:rtc_task_queue_libevent",
        "../../rtc_base:rtc_task_queue_stdlib",
      ]
    } else {
      sources += [ "default_task_queue_factory_libevent.cc" ]
      deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
    }
  } else if (is_mac || is_ios) {
    sources += [ "default_task_queue_factory_gcd.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_gcd" ]
  } else if (is_win && current_os != "winuwp") {
    sources += [ "default_task_queue_factory_win.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_win" ]
  } else {
    sources += [ "default_task_queue_factory_stdlib.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_stdlib" ]
  }
}

至此在api层面交代了TaskQueue的架构设计,即编译开关加上不同的工厂创建方法,对外提供了统一的创建TaskQueue的接口,具体接口的实现在rtc_base目录下。

  • rtc_base对TaskQueueBase和TaskQueueFactory的实现
  • rtc_base/platform_thread.cc
    跨平台封装了线程的实现,对于windows调用CreateThread创建线程,对于Linux调用pthread_create创建线程,创建线程的时候可以设置线程的优先级,优先级定义如下:
enum class ThreadPriority {
  kLow = 1,
  kNormal,
  kHigh,
  kRealtime,
};
  • TaskQueueBase、TaskQueueFactory的子类实现和对应创建TaskQueueFactory实例的接口
    不同的实现细节都不同,主要体现在事件循环的机制上。
  • task_queue_win.h \ task_queue_win.cc
namespace webrtc {
class TaskQueueWin : public TaskQueueBase {
  ..
};

class TaskQueueWinFactory : public TaskQueueFactory {
public:

  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
  }
};

std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
  return std::make_unique<TaskQueueWinFactory>();
}

}  // namespace webrtc
  • task_queue_stdlib.h \ task_queue_stdlib.cc
namespace webrtc {
class TaskQueueStdlib final : public TaskQueueBase {
  ..
};

class TaskQueueStdlibFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));
  }
};

std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() {
  return std::make_unique<TaskQueueStdlibFactory>();
}

} // namespace webrtc
  • task_queue_libevent.h \ task_queue_libevent.cc
namespace webrtc {
namespace {
class TaskQueueLibevent final : public TaskQueueBase {
  ..

};

class TaskQueueLibeventFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueLibevent(name,
                              TaskQueuePriorityToThreadPriority(priority)));
  }
};
}  // namespace

std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
  return std::make_unique<TaskQueueLibeventFactory>();
}
  • task_queue_gcd.h \ task_queue_gcd.cc
namespace webrtc {
namespace {
class TaskQueueGcd final : public TaskQueueBase {
  ..
};

class TaskQueueGcdFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority)));
  }
};
}  // namespace

std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory() {
  return std::make_unique<TaskQueueGcdFactory>();
}

}  // namespace webrtc

rtc::Thread

实现见rtc_base\thread.h, rtc_base\thread.cc两个文件。

  • rtc::Thread和ThreadManager

除了拥有TaskQueue的PostTask、PostDelayTask方法外还有以下功能特性,这些功能特性方便我们对所有的线程的健康状态进行监控:

  • 阻塞调用的接口BlockingCall,当然也可以设置线程禁止阻塞调用;
  void BlockingCall(
      FunctionView<void()> functor,
      const webrtc::Location& location = webrtc::Location::Current()) {
    BlockingCallImpl(std::move(functor), location);
  }

  template <typename Functor,
            typename ReturnT = std::invoke_result_t<Functor>,
            typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>

  ReturnT BlockingCall(
      Functor&& functor,
      const webrtc::Location& location = webrtc::Location::Current()) {
    ReturnT result;
    BlockingCall([&] { result = std::forward<Functor>(functor)(); }, location);
    return result;
  }
  • 统计一段时间内当前线程有多少个阻塞调用,并将结果输出到日志中,具体查看RTC_LOG_THREAD_BLOCK_COUNT宏的实现;
  • 防止阻塞调用过载,具体查看RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN宏的实现;
  • 调试阻塞调用的耗时,具体查看RegisterSendAndCheckForCycles函数的实现,这样可以检测目标线程是否死锁;
  • 巡检所有线程是否健康,具体看ProcessAllMessageQueuesInternal
  • 统计任务执行的耗时,大于某个阈值则会告警,具体看Dispatch的dispatch_warning_ms_定义
void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
  TRACE_EVENT0("webrtc", "Thread::Dispatch");
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time = TimeMillis();
  std::move(task)();
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch.";

    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_ = diff + 1;
  }
}
  • 事件循环单次循环的耗时限制,超过限制后进入wait状态,具体看Get函数的实现。

参考文档:

https://zhuanlan.zhihu.com/p/136070941

相关文章

网友评论

      本文标题:webrtc的线程模块设计的源码分析

      本文链接:https://www.haomeiwen.com/subject/aeskvdtx.html