美文网首页
licode笔记之线程模型ThreadPool和Worker

licode笔记之线程模型ThreadPool和Worker

作者: 云上听风 | 来源:发表于2019-03-22 15:42 被阅读0次

    Scheduler


    ThreadPool构造函数中创建了一个Scheduler对象。
    Scheduler构造参数kNumThreadsPerScheduler,值为2。
    Scheduler使用boost::thread_group创建了kNumThreadsPerScheduler个线程,也就是2个线程。
    线程方法为Scheduler::serviceQueue。
    在Scheduler::serviceQueue方法中,循环检测任务队列判断超时来执行到期任务。
    使用了条件变量std::condition_variable和互斥锁std::mutex来并发任务。

    这个Scheduler实例同时又被传入下面的Worker对象中用来实现定时计划任务。

    Worker


    ThreadPool构造时传入numWorkers,默认配置文件设置此值为24,根据numWorkers创建对应的worker,保存在workers队列。
    Worker构造函数中创建了service_和service_worker_:

    service_{},
    service_worker_{new asio_worker::element_type(service_)}
    

    service_和service_worker_的定义:

    typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
    boost::asio::io_service service_;
    asio_worker service_worker_;
    

    创建ThreadPool之后,要调用ThreadPool::start()

    void ThreadPool::start() {
      std::vector<std::shared_ptr<std::promise<void>>> promises(workers_.size());
      int index = 0;
      for (auto worker : workers_) {
        promises[index] = std::make_shared<std::promise<void>>();
        worker->start(promises[index++]);
      }
      for (auto promise : promises) {
        promise->get_future().wait();
      }
    }
    

    逐个调用 worker->start(promises[index++]);

    void Worker::start(std::shared_ptr<std::promise<void>> start_promise) {
      auto this_ptr = shared_from_this();
      auto worker = [this_ptr, start_promise] {
        start_promise->set_value();
        if (!this_ptr->closed_) {
          return this_ptr->service_.run();
       }
        return size_t(0);
      };
      group_.add_thread(new boost::thread(worker));
    }
    

    每个Woker内部都创建了一个线程,在线程中把boost::asio::io_service service_启动run起来.
    因为io_service关联了一个io_service::work任务对象,所以io_service会一直运行。

    Worker的使用者调用Worker::task将Task任务放入队列中等待执行。

    void Worker::task(Task f) {
      service_.post(f);
    }
    

    用户可以使用ThreadPool::getLessUsedWorker()从workers中获取一个当前引用计数最少的Worker:

    std::shared_ptr<Worker> ThreadPool::getLessUsedWorker() {
      std::shared_ptr<Worker> chosen_worker = workers_.front();
      for (auto worker : workers_) {
        if (chosen_worker.use_count() > worker.use_count()) {
          chosen_worker = worker;
        }
      }
      return chosen_worker;
    }
    

    线程安全


    如果想要一组Task同步执行那么需要在同一个Worker实例中加入Task。
    如果想并发执行Task,那么可以每次从ThreadPool::getLessUsedWorker()取出一个Worker来加入Task。
    因为Task执行在Worker的线程中,跟调用者不在同一线程,如果Task中的代码跟调用者有资源竞争那么需要自己实现加锁。

    因为licode使用node.js做为sdk库的调用者,所以创建的WebRtcConnection和MediaStream都执行在Node.js中的单线程中。
    每个WebRtcConnection和MediaStream又独立使用Worker对象来执行异步线程任务。
    所以WebRtcConnection和MediaStream的内部代码执行在两个线程下,一个Node.js主线程,一个Worker线程。
    当释放WebRtcConnection和MediaStream对象时,在Node.js中调用对象的close方法同时释放对象引用,close方法会在Worker中执行syncClose来释放内部资源。

    我看了一下加入到Worker中的Task基本都没有加锁,那么开发时需要严格保证两个线程中的资源不能冲突。

    关于IOThreadPool和IOWorker


    默认配置在整个系统中只创建了一个IOWorker。
    实际上IOWorker只在使用NicerConnection时才有用,而如果使用LibNiceConnection,那么此IOWorker不需要start,也就没有任何作用。
    因为libnice有自己的内部线程,所以不需要外部线程来处理IO。

    libnice收到数据后回调LibNiceConnection::onData,加锁boost::mutex::scoped_lock lock(close_mutex_);检测当前IceState状态。
    在LibNiceConnection::close也会加锁设置IceState并且释放libnice。

    相关文章

      网友评论

          本文标题:licode笔记之线程模型ThreadPool和Worker

          本文链接:https://www.haomeiwen.com/subject/gkaamqtx.html